• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.17
/source/dnode/vnode/src/vnd/vnodeOpen.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "sync.h"
17
#include "tcs.h"
18
#include "tq.h"
19
#include "tsdb.h"
20
#include "vnd.h"
21

22
void vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
242,429✔
23
  if (pTfs) {
242,429!
24
    SDiskID diskId = {0};
242,453✔
25
    diskId.id = diskPrimary;
242,453✔
26
    snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
242,453✔
27
  } else {
28
    snprintf(buf, bufLen - 1, "%s", relPath);
×
29
  }
30
  buf[bufLen - 1] = '\0';
242,492✔
31
}
242,492✔
32

33
static int32_t vnodeMkDir(STfs *pTfs, const char *path) {
21,568✔
34
  if (pTfs) {
21,568!
35
    return tfsMkdirRecur(pTfs, path);
21,571✔
36
  } else {
37
    return taosMkDir(path);
×
38
  }
39
}
40

41
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
9,599✔
42
  int32_t    code = 0;
9,599✔
43
  SVnodeInfo info = {0};
9,599✔
44
  char       dir[TSDB_FILENAME_LEN] = {0};
9,599✔
45

46
  // check config
47
  if ((code = vnodeCheckCfg(pCfg)) < 0) {
9,599!
48
    vError("vgId:%d, failed to create vnode since:%s", pCfg->vgId, tstrerror(code));
×
49
    return code;
×
50
  }
51

52
  // create vnode env
53
  if (vnodeMkDir(pTfs, path)) {
9,666!
54
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(ERRNO), path);
×
55
    return TAOS_SYSTEM_ERROR(ERRNO);
×
56
  }
57
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
9,682✔
58

59
  if (pCfg) {
9,682!
60
    info.config = *pCfg;
9,682✔
61
  } else {
62
    info.config = vnodeCfgDefault;
×
63
  }
64
  info.state.committed = -1;
9,682✔
65
  info.state.applied = -1;
9,682✔
66
  info.state.commitID = 0;
9,682✔
67

68
  SVnodeInfo oldInfo = {0};
9,682✔
69
  oldInfo.config = vnodeCfgDefault;
9,682✔
70
  if (vnodeLoadInfo(dir, &oldInfo) == 0) {
9,682!
71
    code = (oldInfo.config.dbId == info.config.dbId) ? 0 : TSDB_CODE_VND_ALREADY_EXIST_BUT_NOT_MATCH;
×
72
    if (code == 0) {
×
73
      vWarn("vgId:%d, vnode config info already exists at %s.", oldInfo.config.vgId, dir);
×
74
    } else {
75
      vError("vgId:%d, vnode config info already exists at %s. oldDbId:%" PRId64 "(%s) at cluster:%" PRId64
×
76
             ", newDbId:%" PRId64 "(%s) at cluser:%" PRId64 ", code:%s",
77
             oldInfo.config.vgId, dir, oldInfo.config.dbId, oldInfo.config.dbname,
78
             oldInfo.config.syncCfg.nodeInfo[oldInfo.config.syncCfg.myIndex].clusterId, info.config.dbId,
79
             info.config.dbname, info.config.syncCfg.nodeInfo[info.config.syncCfg.myIndex].clusterId, tstrerror(code));
80
    }
81
    return code;
×
82
  }
83

84
  vInfo("vgId:%d, save config while create", info.config.vgId);
9,682✔
85
  if ((code = vnodeSaveInfo(dir, &info)) < 0 || (code = vnodeCommitInfo(dir)) < 0) {
9,682!
86
    vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(code));
×
87
    return code;
×
88
  }
89

90
  vInfo("vgId:%d, vnode is created", info.config.vgId);
9,682✔
91
  return 0;
9,682✔
92
}
93

94
bool vnodeShouldRemoveWal(SVnode *pVnode) { return pVnode->config.walCfg.clearFiles == 1; }
1,229✔
95

96
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
1,229✔
97
  SVnodeInfo info = {0};
1,229✔
98
  char       dir[TSDB_FILENAME_LEN] = {0};
1,229✔
99
  int32_t    ret = 0;
1,229✔
100

101
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
1,229✔
102

103
  ret = vnodeLoadInfo(dir, &info);
1,229✔
104
  if (ret < 0) {
1,229!
105
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
×
106
    return ret;
×
107
  }
108

109
  SSyncCfg *pCfg = &info.config.syncCfg;
1,229✔
110

111
  pCfg->replicaNum = 0;
1,229✔
112
  pCfg->totalReplicaNum = 0;
1,229✔
113
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
1,229✔
114

115
  for (int i = 0; i < pReq->replica; ++i) {
4,775✔
116
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
3,546✔
117
    pNode->nodeId = pReq->replicas[i].id;
3,546✔
118
    pNode->nodePort = pReq->replicas[i].port;
3,546✔
119
    tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
3,546✔
120
    pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
3,546✔
121
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
3,546✔
122
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
3,546!
123
    pCfg->replicaNum++;
3,546✔
124
  }
125
  if (pReq->selfIndex != -1) {
1,229!
126
    pCfg->myIndex = pReq->selfIndex;
1,229✔
127
  }
128
  for (int i = pCfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
1,524✔
129
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
295✔
130
    pNode->nodeId = pReq->learnerReplicas[pCfg->totalReplicaNum].id;
295✔
131
    pNode->nodePort = pReq->learnerReplicas[pCfg->totalReplicaNum].port;
295✔
132
    pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
295✔
133
    tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[pCfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
295✔
134
    bool ret = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
295✔
135
    vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
295!
136
    pCfg->totalReplicaNum++;
295✔
137
  }
138
  pCfg->totalReplicaNum += pReq->replica;
1,229✔
139
  if (pReq->learnerSelfIndex != -1) {
1,229!
140
    pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
×
141
  }
142
  pCfg->changeVersion = pReq->changeVersion;
1,229✔
143

144
  if (info.config.walCfg.clearFiles) {
1,229!
145
    info.config.walCfg.clearFiles = 0;
×
146

147
    vInfo("vgId:%d, reset wal clearFiles", pReq->vgId);
×
148
  }
149

150
  vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", pReq->vgId,
1,229!
151
        pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
152

153
  info.config.syncCfg = *pCfg;
1,229✔
154
  ret = vnodeSaveInfo(dir, &info);
1,229✔
155
  if (ret < 0) {
1,229!
156
    vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
×
157
    return ret;
×
158
  }
159

160
  ret = vnodeCommitInfo(dir);
1,229✔
161
  if (ret < 0) {
1,229!
162
    vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
×
163
    return ret;
×
164
  }
165

166
  vInfo("vgId:%d, vnode config is saved", info.config.vgId);
1,229!
167
  return 0;
1,229✔
168
}
169

170
static int32_t vnodeVgroupIdLen(int32_t vgId) {
103✔
171
  char tmp[TSDB_FILENAME_LEN];
172
  (void)tsnprintf(tmp, TSDB_FILENAME_LEN, "%d", vgId);
103✔
173
  return strlen(tmp);
103✔
174
}
175

176
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
28✔
177
                            int32_t diskPrimary, STfs *pTfs) {
178
  int32_t ret = 0;
28✔
179

180
  char oldRname[TSDB_FILENAME_LEN] = {0};
28✔
181
  char newRname[TSDB_FILENAME_LEN] = {0};
28✔
182
  char tsdbPath[TSDB_FILENAME_LEN] = {0};
28✔
183
  char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
28✔
184
  snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", srcPath, TD_DIRSEP);
28✔
185
  snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);
28✔
186
  int32_t prefixLen = strlen(tsdbFilePrefix);
28✔
187

188
  STfsDir *tsdbDir = NULL;
28✔
189
  int32_t  tret = tfsOpendir(pTfs, tsdbPath, &tsdbDir);
28✔
190
  if (tsdbDir == NULL) {
28!
191
    return 0;
×
192
  }
193

194
  while (1) {
159✔
195
    const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
187✔
196
    if (tsdbFile == NULL) break;
187✔
197
    if (tsdbFile->rname[0] == '\0') continue;
215!
198
    tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);
159✔
199

200
    char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
159✔
201
    if (tsdbFilePrefixPos == NULL) continue;
159✔
202

203
    int32_t tsdbFileVgId = 0;
103✔
204
    ret = taosStr2int32(tsdbFilePrefixPos + prefixLen, &tsdbFileVgId);
103✔
205
    if (ret != 0) {
103!
206
      vError("vgId:%d, failed to get tsdb file vgid since %s", dstVgId, tstrerror(ret));
×
207
      tfsClosedir(tsdbDir);
×
208
      return ret;
×
209
    }
210

211
    if (tsdbFileVgId == srcVgId) {
103!
212
      char *tsdbFileSurfixPos = tsdbFilePrefixPos + prefixLen + vnodeVgroupIdLen(srcVgId);
103✔
213

214
      tsdbFilePrefixPos[prefixLen] = 0;
103✔
215
      snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
103✔
216
      vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
103!
217

218
      ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname);
103✔
219
      if (ret != 0) {
103!
220
        vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
×
221
        tfsClosedir(tsdbDir);
×
222
        return ret;
×
223
      }
224
    }
225
  }
226

227
  tfsClosedir(tsdbDir);
28✔
228

229
  vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
28!
230
  ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath);
28✔
231
  if (ret != 0) {
28!
232
    vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
×
233
  }
234
  return ret;
28✔
235
}
236

237
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
28✔
238
                            int32_t diskPrimary, STfs *pTfs) {
239
  SVnodeInfo info = {0};
28✔
240
  char       dir[TSDB_FILENAME_LEN] = {0};
28✔
241
  int32_t    ret = 0;
28✔
242

243
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
28✔
244

245
  ret = vnodeLoadInfo(dir, &info);
28✔
246
  if (ret < 0) {
28!
247
    vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
×
248
    return ret;
×
249
  }
250

251
  vInfo("vgId:%d, alter hashrange from [%u, %u] to [%u, %u]", pReq->srcVgId, info.config.hashBegin, info.config.hashEnd,
28!
252
        pReq->hashBegin, pReq->hashEnd);
253
  info.config.vgId = pReq->dstVgId;
28✔
254
  info.config.hashBegin = pReq->hashBegin;
28✔
255
  info.config.hashEnd = pReq->hashEnd;
28✔
256
  info.config.hashChange = true;
28✔
257
  info.config.walCfg.vgId = pReq->dstVgId;
28✔
258
  info.config.syncCfg.changeVersion = pReq->changeVersion;
28✔
259

260
  SSyncCfg *pCfg = &info.config.syncCfg;
28✔
261
  pCfg->myIndex = 0;
28✔
262
  pCfg->replicaNum = 1;
28✔
263
  pCfg->totalReplicaNum = 1;
28✔
264
  memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
28✔
265

266
  vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
28!
267
  SNodeInfo *pNode = &pCfg->nodeInfo[0];
28✔
268
  pNode->nodePort = tsServerPort;
28✔
269
  tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
28✔
270
  bool ret1 = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
28✔
271
  vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
28!
272

273
  info.config.syncCfg = *pCfg;
28✔
274

275
  ret = vnodeSaveInfo(dir, &info);
28✔
276
  if (ret < 0) {
28!
277
    vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
278
    return ret;
×
279
  }
280

281
  ret = vnodeCommitInfo(dir);
28✔
282
  if (ret < 0) {
28!
283
    vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
×
284
    return ret;
×
285
  }
286

287
  vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
28!
288
  ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
28✔
289
  if (ret < 0) {
28!
290
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
×
291
           tstrerror(terrno));
292
    return ret;
×
293
  }
294

295
  vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
28!
296
  return 0;
28✔
297
}
298

299
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
×
300
                             int32_t diskPrimary, STfs *pTfs) {
301
  SVnodeInfo info = {0};
×
302
  char       dir[TSDB_FILENAME_LEN] = {0};
×
303
  int32_t    code = 0;
×
304

305
  vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
306
  if (vnodeLoadInfo(dir, &info) == 0) {
×
307
    if (info.config.vgId != dstVgId) {
×
308
      vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
309
      return TSDB_CODE_FAILED;
×
310
    }
311
    return dstVgId;
×
312
  }
313

314
  vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
×
315
  if ((code = vnodeLoadInfo(dir, &info)) < 0) {
×
316
    vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
×
317
    return code;
×
318
  }
319

320
  if (info.config.vgId == srcVgId) {
×
321
    vInfo("vgId:%d, rollback alter hashrange", srcVgId);
×
322
    return srcVgId;
×
323
  } else if (info.config.vgId != dstVgId) {
×
324
    vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
×
325
    return TSDB_CODE_FAILED;
×
326
  }
327

328
  vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
×
329
  if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
×
330
    vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
×
331
    return TSDB_CODE_FAILED;
×
332
  }
333

334
  return dstVgId;
×
335
}
336

337
void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs, int32_t nodeId) {
4,632✔
338
  vInfo("path:%s is removed while destroy vnode", path);
4,632✔
339
  if (tfsRmdir(pTfs, path) < 0) {
4,632!
340
    vError("failed to remove path:%s since %s", path, tstrerror(terrno));
×
341
  }
342

343
  // int32_t nlevel = tfsGetLevel(pTfs);
344
#ifdef USE_S3
345
  if (nodeId > 0 && vgId > 0 /*&& nlevel > 1*/ && tsS3Enabled) {
4,632!
346
    char vnode_prefix[TSDB_FILENAME_LEN];
347
    snprintf(vnode_prefix, TSDB_FILENAME_LEN, "%d/v%df", nodeId, vgId);
×
348
    tcsDeleteObjectsByPrefix(vnode_prefix);
×
349
  }
350
#endif
351
}
4,632✔
352

353
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
11,903✔
354
  int32_t ndisk = 1;
11,903✔
355
  if (pTfs) {
11,903!
356
    ndisk = tfsGetDisksAtLevel(pTfs, 0);
11,903✔
357
  }
358
  if (diskPrimary < 0 || diskPrimary >= ndisk) {
11,906!
359
    vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
×
360
    return terrno = TSDB_CODE_FS_INVLD_CFG;
×
361
  }
362
  return 0;
11,906✔
363
}
364

365
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool force) {
11,894✔
366
  SVnode    *pVnode = NULL;
11,894✔
367
  SVnodeInfo info = {0};
11,894✔
368
  char       dir[TSDB_FILENAME_LEN] = {0};
11,894✔
369
  char       tdir[TSDB_FILENAME_LEN * 2] = {0};
11,894✔
370
  int32_t    ret = 0;
11,894✔
371
  terrno = TSDB_CODE_SUCCESS;
11,894✔
372

373
  if (vnodeCheckDisk(diskPrimary, pTfs)) {
11,903!
374
    vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
×
375
    return NULL;
×
376
  }
377
  vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
11,906✔
378

379
  info.config = vnodeCfgDefault;
11,907✔
380

381
  // load vnode info
382
  vInfo("vgId:%d, start to vnode load info %s", info.config.vgId, dir);
11,907✔
383
  ret = vnodeLoadInfo(dir, &info);
11,913✔
384
  if (ret < 0) {
11,905!
385
    vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
×
386
    terrno = TSDB_CODE_NEED_RETRY;
×
387
    return NULL;
×
388
  }
389

390
  if (vnodeMkDir(pTfs, path)) {
11,905!
UNCOV
391
    vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", info.config.vgId, strerror(ERRNO), path);
×
392
    return NULL;
×
393
  }
394
  // save vnode info on dnode ep changed
395
  bool      updated = false;
11,867✔
396
  SSyncCfg *pCfg = &info.config.syncCfg;
11,867✔
397
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
29,828✔
398
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
17,924✔
399
    if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
17,924!
400
      updated = true;
×
401
    }
402
  }
403
  if (updated) {
11,904!
404
    vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
×
405
    if (vnodeSaveInfo(dir, &info) < 0) {
×
406
      vError("vgId:%d, failed to save vnode info since %s", info.config.vgId, tstrerror(terrno));
×
407
    }
408

409
    if (vnodeCommitInfo(dir) < 0) {
×
410
      vError("vgId:%d, failed to commit vnode info since %s", info.config.vgId, tstrerror(terrno));
×
411
    }
412
  }
413

414
  // create handle
415
  pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
11,895!
416
  if (pVnode == NULL) {
11,864!
417
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
418
    vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
×
419
    return NULL;
×
420
  }
421

422
  pVnode->path = (char *)&pVnode[1];
11,864✔
423
  memcpy(pVnode->path, path, strlen(path) + 1);
11,864✔
424
  pVnode->config = info.config;
11,864✔
425
  pVnode->state.committed = info.state.committed;
11,864✔
426
  pVnode->state.commitTerm = info.state.commitTerm;
11,864✔
427
  pVnode->state.commitID = info.state.commitID;
11,864✔
428
  pVnode->state.applied = info.state.committed;
11,864✔
429
  pVnode->state.applyTerm = info.state.commitTerm;
11,864✔
430
  pVnode->pTfs = pTfs;
11,864✔
431
  pVnode->diskPrimary = diskPrimary;
11,864✔
432
  pVnode->msgCb = msgCb;
11,864✔
433
  (void)taosThreadMutexInit(&pVnode->lock, NULL);
11,864✔
434
  pVnode->blocked = false;
11,855✔
435
  pVnode->disableWrite = false;
11,855✔
436

437
  if (tsem_init(&pVnode->syncSem, 0, 0) != 0) {
11,855!
438
    vError("vgId:%d, failed to init semaphore", TD_VID(pVnode));
×
439
    goto _err;
×
440
  }
441
  (void)taosThreadMutexInit(&pVnode->mutex, NULL);
11,855✔
442
  (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
11,898✔
443

444
  int8_t rollback = vnodeShouldRollback(pVnode);
11,860✔
445

446
  // open buffer pool
447
  vInfo("vgId:%d, start to open vnode buffer pool", TD_VID(pVnode));
11,879✔
448
  if (vnodeOpenBufPool(pVnode) < 0) {
11,897!
449
    vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
450
    goto _err;
×
451
  }
452

453
  // open meta
454
  (void)taosThreadRwlockInit(&pVnode->metaRWLock, NULL);
11,906✔
455
  vInfo("vgId:%d, start to open vnode meta", TD_VID(pVnode));
11,907✔
456
  if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
11,907!
457
    vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
458
    goto _err;
×
459
  }
460

461
  vInfo("vgId:%d, start to upgrade meta", TD_VID(pVnode));
11,900✔
462
  if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
11,907!
463
    vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
×
464
  }
465

466
  // open tsdb
467
  vInfo("vgId:%d, start to open vnode tsdb", TD_VID(pVnode));
11,904✔
468
  if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback, force) < 0) {
11,907!
469
    vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
×
470
    goto _err;
×
471
  }
472

473
  // open wal
474
  (void)tsnprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
11,902✔
475
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
11,900✔
476
  TAOS_UNUSED(ret);
477

478
  vInfo("vgId:%d, start to open vnode wal", TD_VID(pVnode));
11,893✔
479
  pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
11,902✔
480
  if (pVnode->pWal == NULL) {
11,902!
481
    vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
×
482
    goto _err;
×
483
  }
484

485
  // open tq
486
  (void)tsnprintf(tdir, sizeof(tdir), "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
11,902✔
487
  ret = taosRealPath(tdir, NULL, sizeof(tdir));
11,902✔
488
  TAOS_UNUSED(ret);
489

490
  // init handle map for stream event notification
491
  ret = tqInitNotifyHandleMap(&pVnode->pNotifyHandleMap);
11,901✔
492
  if (ret != TSDB_CODE_SUCCESS) {
11,886!
493
    vError("vgId:%d, failed to init StreamNotifyHandleMap", TD_VID(pVnode));
×
494
    terrno = ret;
×
495
    goto _err;
×
496
  }
497

498
  // open query
499
  vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
11,886✔
500
  if (vnodeQueryOpen(pVnode)) {
11,905!
501
    vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
×
502
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
503
    goto _err;
×
504
  }
505

506
  // sma required the tq is initialized before the vnode open
507
  vInfo("vgId:%d, start to open vnode tq", TD_VID(pVnode));
11,907✔
508
  if (tqOpen(tdir, pVnode)) {
11,908!
509
    vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
×
510
    goto _err;
×
511
  }
512

513
  // open sma
514
  vInfo("vgId:%d, start to open vnode sma", TD_VID(pVnode));
11,903✔
515
  if (smaOpen(pVnode, rollback, force)) {
11,907!
516
    vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
×
517
    goto _err;
×
518
  }
519

520
  // vnode begin
521
  vInfo("vgId:%d, start to begin vnode", TD_VID(pVnode));
11,906✔
522
  if (vnodeBegin(pVnode) < 0) {
11,907!
523
    vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
×
524
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
525
    goto _err;
×
526
  }
527

528
  // open sync
529
  vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion);
11,906✔
530
  if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) {
11,907!
531
    vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
×
532
    goto _err;
×
533
  }
534

535
  if (rollback) {
11,907!
536
    vnodeRollback(pVnode);
×
537
  }
538

539
  snprintf(pVnode->monitor.strClusterId, TSDB_CLUSTER_ID_LEN, "%" PRId64, pVnode->config.syncCfg.nodeInfo[0].clusterId);
11,907✔
540
  snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%" PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId);
11,907✔
541
  snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%" PRId32, pVnode->config.vgId);
11,907✔
542

543
  return pVnode;
11,907✔
544

545
_err:
×
546
  if (pVnode->pQuery) vnodeQueryClose(pVnode);
×
547
  if (pVnode->pTq) tqClose(pVnode->pTq);
×
548
  if (pVnode->pWal) walClose(pVnode->pWal);
×
549
  if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
×
550
  if (pVnode->pSma) smaClose(pVnode->pSma);
×
551
  if (pVnode->pMeta) metaClose(&pVnode->pMeta);
×
552
  if (pVnode->freeList) vnodeCloseBufPool(pVnode);
×
553

554
  (void)taosThreadRwlockDestroy(&pVnode->metaRWLock);
×
555
  taosMemoryFree(pVnode);
×
556
  return NULL;
×
557
}
558

559
void vnodePreClose(SVnode *pVnode) {
11,905✔
560
  vnodeSyncPreClose(pVnode);
11,905✔
561
  vnodeQueryPreClose(pVnode);
11,905✔
562
}
11,903✔
563

564
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
11,907✔
565

566
void vnodeClose(SVnode *pVnode) {
11,899✔
567
  if (pVnode) {
11,899!
568
    vnodeAWait(&pVnode->commitTask);
11,906✔
569
    vnodeSyncClose(pVnode);
11,906✔
570
    vnodeQueryClose(pVnode);
11,901✔
571
    tqDestroyNotifyHandleMap(&pVnode->pNotifyHandleMap);
11,907✔
572
    tqClose(pVnode->pTq);
11,906✔
573
    walClose(pVnode->pWal);
11,907✔
574
    if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
11,907!
575
    smaClose(pVnode->pSma);
11,905✔
576
    if (pVnode->pMeta) metaClose(&pVnode->pMeta);
11,901!
577
    vnodeCloseBufPool(pVnode);
11,904✔
578

579
    // destroy handle
580
    if (tsem_destroy(&pVnode->syncSem) != 0) {
11,907!
581
      vError("vgId:%d, failed to destroy semaphore", TD_VID(pVnode));
×
582
    }
583
    (void)taosThreadCondDestroy(&pVnode->poolNotEmpty);
11,907✔
584
    (void)taosThreadMutexDestroy(&pVnode->mutex);
11,907✔
585
    (void)taosThreadMutexDestroy(&pVnode->lock);
11,907✔
586
    taosMemoryFree(pVnode);
11,907!
587
  }
588
}
11,900✔
589

590
// start the sync timer after the queue is ready
591
int32_t vnodeStart(SVnode *pVnode) {
11,907✔
592
  if (pVnode == NULL) {
11,907!
593
    return TSDB_CODE_INVALID_PARA;
×
594
  }
595
  return vnodeSyncStart(pVnode);
11,907✔
596
}
597

598
int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
3,865✔
599

600
ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); }
3,865✔
601

602
int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm) { return syncUpdateArbTerm(pVnode->sync, arbTerm); }
110✔
603
int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken) { return syncGetArbToken(pVnode->sync, outToken); }
123✔
604

605
void vnodeStop(SVnode *pVnode) {}
×
606

607
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc