• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4858

17 Nov 2025 09:53AM UTC coverage: 64.048% (-0.09%) from 64.135%
#4858

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

218 of 311 new or added lines in 32 files covered. (70.1%)

4856 existing lines in 134 files now uncovered.

151096 of 235910 relevant lines covered (64.05%)

115767925.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.09
/source/dnode/vnode/src/vnd/vnodeCommit.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "sync.h"
18
#include "vnd.h"
19
#include "vnodeInt.h"
20

21
extern int32_t tsdbPreCommit(STsdb *pTsdb);
22
extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo);
23
extern int32_t tsdbCommitCommit(STsdb *pTsdb);
24
extern int32_t tsdbCommitAbort(STsdb *pTsdb);
25

26
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
27

28
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
29
static int vnodeCommitImpl(SCommitInfo *pInfo);
30

31
#define WAIT_TIME_MILI_SEC 10  // miliseconds
32

33
static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
×
34
  int32_t code = 0;
×
35

36
  if (pVnode->onRecycle == NULL) {
×
37
    if (pVnode->recycleHead == NULL) {
×
38
      vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode));
×
39
      goto _exit;
×
40
    } else {
41
      vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
×
42
             pVnode->recycleHead->id);
43

44
      pVnode->onRecycle = pVnode->recycleHead;
×
45
      if (pVnode->recycleHead == pVnode->recycleTail) {
×
46
        pVnode->recycleHead = pVnode->recycleTail = NULL;
×
47
      } else {
48
        pVnode->recycleHead = pVnode->recycleHead->recycleNext;
×
49
        pVnode->recycleHead->recyclePrev = NULL;
×
50
      }
51
      pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
×
52
    }
53
  }
54

55
  code = vnodeBufPoolRecycle(pVnode->onRecycle);
×
56
  if (code) goto _exit;
×
57

58
_exit:
×
59
  if (code) {
×
60
    vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
×
61
  }
62
  return code;
×
63
}
64
static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
9,288,903✔
65
  int32_t code = 0;
9,288,903✔
66
  int32_t lino = 0;
9,288,903✔
67

68
  (void)taosThreadMutexLock(&pVnode->mutex);
9,288,903✔
69

70
  int32_t nTry = 0;
9,289,049✔
71
  for (;;) {
72
    ++nTry;
9,289,049✔
73

74
    if (pVnode->freeList) {
9,289,049✔
75
      vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
9,289,049✔
76
             pVnode->freeList->id);
77

78
      pVnode->inUse = pVnode->freeList;
9,289,049✔
79
      pVnode->inUse->nRef = 1;
9,289,049✔
80
      pVnode->freeList = pVnode->inUse->freeNext;
9,289,049✔
81
      pVnode->inUse->freeNext = NULL;
9,289,049✔
82
      break;
9,289,049✔
83
    } else {
84
      vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
×
85

86
      code = vnodeTryRecycleBufPool(pVnode);
×
87
      TSDB_CHECK_CODE(code, lino, _exit);
×
88

89
      if (pVnode->freeList == NULL) {
×
90
        vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
×
91

92
        struct timeval  tv;
×
93
        struct timespec ts;
×
94
        if (taosGetTimeOfDay(&tv) != 0) {
×
95
          continue;
×
96
        }
97
        ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
×
98
        if (ts.tv_nsec > 999999999l) {
×
99
          ts.tv_sec = tv.tv_sec + 1;
×
100
          ts.tv_nsec -= 1000000000l;
×
101
        } else {
102
          ts.tv_sec = tv.tv_sec;
×
103
        }
104

105
        code = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
×
106
        // ignore timeout error and retry
107
        if (code == TSDB_CODE_TIMEOUT_ERROR) {
×
108
          code = TSDB_CODE_SUCCESS;
×
109
        }
110
        TSDB_CHECK_CODE(code, lino, _exit);
×
111
      }
112
    }
113
  }
114

115
_exit:
9,289,049✔
116
  (void)taosThreadMutexUnlock(&pVnode->mutex);
9,289,049✔
117
  if (code) {
9,289,049✔
118
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
119
  }
120
  return code;
9,288,581✔
121
}
122
int vnodeBegin(SVnode *pVnode) {
9,288,945✔
123
  int32_t code = 0;
9,288,945✔
124
  int32_t lino = 0;
9,288,945✔
125

126
  // alloc buffer pool
127
  code = vnodeGetBufPoolToUse(pVnode);
9,288,945✔
128
  TSDB_CHECK_CODE(code, lino, _exit);
9,289,049✔
129

130
  // begin meta
131
  code = metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL);
9,289,049✔
132
  TSDB_CHECK_CODE(code, lino, _exit);
9,286,253✔
133

134
  // begin tsdb
135
  code = tsdbBegin(pVnode->pTsdb);
9,286,253✔
136
  TSDB_CHECK_CODE(code, lino, _exit);
9,286,778✔
137

138
_exit:
9,286,778✔
139
  if (code) {
9,286,778✔
140
    terrno = code;
×
141
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
142
  }
143
  return code;
9,287,676✔
144
}
145

146
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
551,425,088✔
147
  bool diskAvail = osDataSpaceAvailable();
551,425,088✔
148
  bool needCommit = false;
551,425,522✔
149

150
  (void)taosThreadMutexLock(&pVnode->mutex);
551,425,522✔
151
  if (pVnode->inUse && diskAvail) {
551,430,890✔
152
    needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) ||
554,554,762✔
153
                 (atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed ||
3,127,773✔
154
                             pVnode->state.applied - pVnode->state.committed > 4096));
1,382,800✔
155
  }
156
  vTrace("vgId:%d, should commit:%d, disk available:%d, buffer size:%" PRId64 ", node size:%" PRId64
551,432,883✔
157
         ", meta changed:%d"
158
         ", state:[%" PRId64 ",%" PRId64 "]",
159
         TD_VID(pVnode), needCommit, diskAvail, pVnode->inUse ? pVnode->inUse->size : 0,
160
         pVnode->inUse ? pVnode->inUse->node.size : 0, pVnode->pMeta->changed, pVnode->state.applied,
161
         pVnode->state.committed);
162
  (void)taosThreadMutexUnlock(&pVnode->mutex);
551,432,883✔
163
  return needCommit;
551,431,554✔
164
}
165

166
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
8,893,161✔
167
  int32_t   code = 0;
8,893,161✔
168
  int32_t   lino;
169
  char      fname[TSDB_FILENAME_LEN];
8,885,641✔
170
  TdFilePtr pFile = NULL;
8,893,161✔
171
  char     *data = NULL;
8,893,161✔
172

173
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
8,893,161✔
174

175
  code = vnodeEncodeInfo(pInfo, &data);
8,893,161✔
176
  TSDB_CHECK_CODE(code, lino, _exit);
8,889,419✔
177

178
  // save info to a vnode_tmp.json
179
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
8,889,419✔
180
  if (pFile == NULL) {
8,882,668✔
181
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
182
  }
183

184
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
8,882,668✔
185
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
186
  }
187

188
  if (taosFsyncFile(pFile) < 0) {
8,876,106✔
189
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
190
  }
191

192
_exit:
8,889,671✔
193
  if (code) {
8,886,741✔
194
    vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
×
195
  } else {
196
    vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
8,886,741✔
197
          pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
198
  }
199
  if (taosCloseFile(&pFile) != 0) {
8,891,950✔
200
    vError("vgId:%d, failed to close file", pInfo->config.vgId);
×
201
  }
202
  taosMemoryFree(data);
8,893,161✔
203
  TAOS_RETURN(code);
8,891,796✔
204
}
205

206
int vnodeCommitInfo(const char *dir) {
8,892,122✔
207
  char fname[TSDB_FILENAME_LEN];
8,884,602✔
208
  char tfname[TSDB_FILENAME_LEN];
8,884,602✔
209

210
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
8,892,122✔
211
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
8,892,122✔
212

213
  int32_t code = taosRenameFile(tfname, fname);
8,892,122✔
214
  if (code < 0) {
8,889,350✔
215
    return code;
×
216
  }
217

218
  vInfo("vnode info is committed, dir:%s", dir);
8,889,350✔
219
  return 0;
8,892,873✔
220
}
221

222
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
7,732,186✔
223
  int32_t   code = 0;
7,732,186✔
224
  int32_t   lino;
225
  char      fname[TSDB_FILENAME_LEN];
7,721,793✔
226
  TdFilePtr pFile = NULL;
7,733,620✔
227
  char     *pData = NULL;
7,733,620✔
228
  int64_t   size;
7,723,227✔
229

230
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
7,733,620✔
231

232
  // read info
233
  pFile = taosOpenFile(fname, TD_FILE_READ);
7,733,620✔
234
  if (pFile == NULL) {
7,731,400✔
235
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
2,929,316✔
236
  }
237

238
  code = taosFStatFile(pFile, &size, NULL);
4,802,084✔
239
  TSDB_CHECK_CODE(code, lino, _exit);
4,800,874✔
240

241
  pData = taosMemoryMalloc(size + 1);
4,800,874✔
242
  if (pData == NULL) {
4,802,082✔
243
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
244
  }
245

246
  if (taosReadFile(pFile, pData, size) < 0) {
4,802,082✔
247
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
248
  }
249

250
  pData[size] = '\0';
4,801,957✔
251

252
  // decode info
253
  code = vnodeDecodeInfo(pData, pInfo);
4,801,546✔
254
  TSDB_CHECK_CODE(code, lino, _exit);
4,801,506✔
255

256
  pInfo->config.walCfg.committed = pInfo->state.committed;
4,801,506✔
257
_exit:
7,732,020✔
258
  if (code) {
7,733,299✔
259
    vError("vgId:%d %s failed at %s:%d since %s, file:%s", pInfo->config.vgId, __func__, __FILE__, lino,
2,929,964✔
260
           tstrerror(code), fname);
261
  }
262
  taosMemoryFree(pData);
7,732,187✔
263
  if (taosCloseFile(&pFile) != 0) {
7,732,869✔
264
    vError("vgId:%d, failed to close file", pInfo->config.vgId);
×
265
  }
266
  return code;
7,733,620✔
267
}
268

269
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
5,216,099✔
270
  int32_t code = 0;
5,216,099✔
271
  int32_t lino = 0;
5,216,099✔
272
  char    dir[TSDB_FILENAME_LEN] = {0};
5,216,099✔
273
  int64_t lastCommitted = pInfo->info.state.committed;
5,217,301✔
274

275
  // wait last commit task
276
  vnodeAWait(&pVnode->commitTask);
5,217,260✔
277

278
  code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg);
5,218,488✔
279
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,488✔
280

281
  pVnode->state.commitTerm = pVnode->state.applyTerm;
5,218,488✔
282

283
  pInfo->info.config = pVnode->config;
5,218,488✔
284
  pInfo->info.state.committed = pVnode->state.applied;
5,218,488✔
285
  pInfo->info.state.commitTerm = pVnode->state.applyTerm;
5,218,512✔
286
  pInfo->info.state.commitID = ++pVnode->state.commitID;
5,217,755✔
287
  pInfo->pVnode = pVnode;
5,217,902✔
288
  pInfo->txn = metaGetTxn(pVnode->pMeta);
5,218,488✔
289

290
  // save info
291
  vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
5,217,923✔
292

293
  vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
5,214,612✔
294
  code = vnodeSaveInfo(dir, &pInfo->info);
5,218,691✔
295
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
296

297
  code = tsdbPreCommit(pVnode->pTsdb);
5,218,512✔
298
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
299

300
  code = metaPrepareAsyncCommit(pVnode->pMeta);
5,218,512✔
301
  TSDB_CHECK_CODE(code, lino, _exit);
5,213,699✔
302

303
  (void)taosThreadMutexLock(&pVnode->mutex);
5,213,699✔
304
  pVnode->onCommit = pVnode->inUse;
5,216,466✔
305
  pVnode->inUse = NULL;
5,216,147✔
306
  (void)taosThreadMutexUnlock(&pVnode->mutex);
5,211,674✔
307

308
_exit:
5,216,230✔
309
  if (code) {
5,216,230✔
310
    vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
×
311
           tstrerror(code), pVnode->state.commitID);
312
  } else {
313
    vDebug("vgId:%d, %s done, commit id:%" PRId64, TD_VID(pVnode), __func__, pInfo->info.state.commitID);
5,216,230✔
314
  }
315

316
  return code;
5,221,033✔
317
}
318
static void vnodeReturnBufPool(SVnode *pVnode) {
5,218,512✔
319
  (void)taosThreadMutexLock(&pVnode->mutex);
5,218,512✔
320

321
  SVBufPool *pPool = pVnode->onCommit;
5,218,512✔
322
  int32_t    nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
5,218,512✔
323

324
  pVnode->onCommit = NULL;
5,218,512✔
325
  if (nRef == 0) {
5,218,512✔
326
    vnodeBufPoolAddToFreeList(pPool);
5,190,284✔
327
  } else if (nRef > 0) {
28,228✔
328
    vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
28,228✔
329

330
    if (pVnode->recycleTail == NULL) {
28,228✔
331
      pPool->recyclePrev = pPool->recycleNext = NULL;
28,228✔
332
      pVnode->recycleHead = pVnode->recycleTail = pPool;
28,228✔
333
    } else {
334
      pPool->recyclePrev = pVnode->recycleTail;
×
335
      pPool->recycleNext = NULL;
×
336
      pVnode->recycleTail->recycleNext = pPool;
×
337
      pVnode->recycleTail = pPool;
×
338
    }
339
  } else {
340
    vError("vgId:%d, buffer pool %p of id %d nRef:%d", TD_VID(pVnode), pPool, pPool->id, nRef);
×
341
  }
342

343
  (void)taosThreadMutexUnlock(&pVnode->mutex);
5,218,512✔
344
}
5,218,512✔
345
static int32_t vnodeCommit(void *arg) {
5,218,512✔
346
  int32_t code = 0;
5,218,512✔
347

348
  SCommitInfo *pInfo = (SCommitInfo *)arg;
5,218,512✔
349
  SVnode      *pVnode = pInfo->pVnode;
5,218,512✔
350

351
  // commit
352
  METRICS_TIMING_BLOCK(pVnode->writeMetrics.commit_time, METRIC_LEVEL_HIGH, {
5,218,512✔
353
    if ((code = vnodeCommitImpl(pInfo))) {
354
      vFatal("vgId:%d, failed to commit vnode since %s", TD_VID(pVnode), terrstr());
355
      taosMsleep(100);
356
      exit(EXIT_FAILURE);
357
      goto _exit;
358
    }
359
  });
360

361
  METRICS_UPDATE(pVnode->writeMetrics.commit_count, METRIC_LEVEL_HIGH, 1);
5,218,512✔
362

363
  vnodeReturnBufPool(pVnode);
5,218,512✔
364

365
_exit:
5,218,512✔
366
  taosMemoryFree(arg);
5,218,512✔
367
  return code;
5,218,512✔
368
}
369

370
static void vnodeCommitCancel(void *arg) { taosMemoryFree(arg); }
×
371

372
int vnodeAsyncCommitEx(SVnode *pVnode, bool forceTrim) {
5,216,660✔
373
  int32_t code = 0;
5,216,660✔
374
  int32_t lino = 0;
5,216,660✔
375

376
  SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
5,216,660✔
377
  if (NULL == pInfo) {
5,215,342✔
378
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
379
  }
380

381
  pInfo->forceTrim = forceTrim;
5,215,342✔
382

383
  // prepare to commit
384
  code = vnodePrepareCommit(pVnode, pInfo);
5,215,342✔
385
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
386

387
  // schedule the task
388
  code = vnodeAsync(COMMIT_TASK_ASYNC, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask);
5,218,512✔
389
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
390

391
_exit:
5,218,512✔
392
  if (code) {
5,218,512✔
UNCOV
393
    taosMemoryFree(pInfo);
×
UNCOV
394
    vError("vgId:%d %s failed at line %d since %s" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code));
×
395
  } else {
396
    vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64 " forceTrim:%d",
5,218,512✔
397
          TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied, forceTrim);
398
  }
399
  return code;
5,218,512✔
400
}
401

402
int vnodeAsyncCommit(SVnode *pVnode) { return vnodeAsyncCommitEx(pVnode, false); }
5,213,701✔
403

404
int32_t vnodeSyncCommit(SVnode *pVnode) {
40,280✔
405
  int32_t lino;
406
  int32_t code = vnodeAsyncCommit(pVnode);
40,280✔
407
  TSDB_CHECK_CODE(code, lino, _exit);
40,280✔
408
  vnodeAWait(&pVnode->commitTask);
40,280✔
409

410
_exit:
40,280✔
411
  if (code) {
40,280✔
UNCOV
412
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
413
  } else {
414
    vInfo("vgId:%d, sync commit end", TD_VID(pVnode));
40,280✔
415
  }
416

417
  return code;
40,280✔
418
}
419

420
static int vnodeCommitImpl(SCommitInfo *pInfo) {
5,218,512✔
421
  int32_t code = 0;
5,218,512✔
422
  int32_t lino = 0;
5,218,512✔
423

424
  char    dir[TSDB_FILENAME_LEN] = {0};
5,218,512✔
425
  SVnode *pVnode = pInfo->pVnode;
5,218,512✔
426

427
  vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
5,218,512✔
428
        pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
429

430
  // persist wal before starting
431
  if ((code = walPersist(pVnode->pWal)) < 0) {
5,218,601✔
UNCOV
432
    vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), tstrerror(code));
×
UNCOV
433
    return code;
×
434
  }
435

436
  vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
5,218,512✔
437

438
  code = syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
5,218,512✔
439
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
440

441
  code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
5,218,512✔
442
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
443

444
  if (!TSDB_CACHE_NO(pVnode->config)) {
5,218,512✔
445
    METRICS_TIMING_BLOCK(pVnode->writeMetrics.last_cache_commit_time, METRIC_LEVEL_HIGH,
82,441✔
446
                         { code = tsdbCacheCommit(pVnode->pTsdb); });
447
    METRICS_UPDATE(pVnode->writeMetrics.last_cache_commit_count, METRIC_LEVEL_HIGH, 1);
82,441✔
448
    TSDB_CHECK_CODE(code, lino, _exit);
82,441✔
449
  }
450

451
  // blob storage engine commit
452
  code = bseCommit(pVnode->pBse);
5,218,512✔
453
  // commit info
454
  code = vnodeCommitInfo(dir);
5,218,512✔
455
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
456

457
  code = tsdbCommitCommit(pVnode->pTsdb);
5,218,512✔
458
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
459

460
  code = metaFinishCommit(pVnode->pMeta, pInfo->txn);
5,218,512✔
461
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
462

463
  pVnode->state.committed = pInfo->info.state.committed;
5,218,512✔
464

465
  code = syncEndSnapshot(pVnode->sync, pInfo->forceTrim);
5,218,512✔
466
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
467

468
  code = tqCommitOffset(pVnode->pTq);
5,218,512✔
469
  TSDB_CHECK_CODE(code, lino, _exit);
5,218,512✔
470
  
471
_exit:
5,218,512✔
472
  if (code) {
5,217,349✔
UNCOV
473
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
474
  } else {
475
    vInfo("vgId:%d, commit end", TD_VID(pVnode));
5,217,349✔
476
  }
477
  return code;
5,218,512✔
478
}
479

480
bool vnodeShouldRollback(SVnode *pVnode) {
4,075,403✔
481
  char    tFName[TSDB_FILENAME_LEN] = {0};
4,075,403✔
482
  int32_t offset = 0;
4,075,403✔
483

484
  vnodeGetPrimaryPath(pVnode, false, tFName, TSDB_FILENAME_LEN);
4,075,403✔
485
  offset = strlen(tFName);
4,075,403✔
486
  snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
4,075,403✔
487

488
  return taosCheckExistFile(tFName);
4,075,403✔
489
}
490

491
void vnodeRollback(SVnode *pVnode) {
288✔
492
  char    tFName[TSDB_FILENAME_LEN] = {0};
288✔
493
  int32_t offset = 0;
288✔
494

495
  vnodeGetPrimaryPath(pVnode, false, tFName, TSDB_FILENAME_LEN);
288✔
496
  offset = strlen(tFName);
288✔
497
  snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
288✔
498

499
  if (taosRemoveFile(tFName) != 0) {
288✔
UNCOV
500
    vError("vgId:%d, failed to remove file %s since %s", TD_VID(pVnode), tFName, tstrerror(terrno));
×
501
  }
502
}
288✔
503

504
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
8,892,768✔
505
  const SVState *pState = (SVState *)pObj;
8,892,768✔
506

507
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit version", pState->committed));
8,892,768✔
508
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID));
8,892,377✔
509
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm));
8,891,859✔
510

511
  return 0;
8,890,329✔
512
}
513

514
static int vnodeDecodeState(const SJson *pJson, void *pObj) {
4,822,634✔
515
  SVState *pState = (SVState *)pObj;
4,822,634✔
516

517
  int32_t code;
518
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
4,822,634✔
519
  if (code) return code;
4,822,440✔
520
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
4,822,440✔
521
  if (code) return code;
4,822,440✔
522
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
4,822,440✔
523
  if (code) return code;
4,822,422✔
524

525
  return 0;
4,822,422✔
526
}
527

528
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
8,889,740✔
529
  int32_t code = 0;
8,889,740✔
530
  int32_t lino;
531
  SJson  *pJson = NULL;
8,889,740✔
532
  char   *pData = NULL;
8,889,740✔
533

534
  pJson = tjsonCreateObject();
8,889,740✔
535
  if (pJson == NULL) {
8,893,161✔
UNCOV
536
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
537
  }
538

539
  code = tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config);
8,893,161✔
540
  TSDB_CHECK_CODE(code, lino, _exit);
8,892,770✔
541

542
  code = tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state);
8,892,770✔
543
  TSDB_CHECK_CODE(code, lino, _exit);
8,892,377✔
544

545
  pData = tjsonToString(pJson);
8,892,377✔
546
  if (pData == NULL) {
8,892,404✔
UNCOV
547
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
548
  }
549

550
  tjsonDelete(pJson);
8,892,404✔
551

552
_exit:
8,888,404✔
553
  if (code) {
8,888,404✔
UNCOV
554
    tjsonDelete(pJson);
×
UNCOV
555
    *ppData = NULL;
×
556
  } else {
557
    *ppData = pData;
8,888,404✔
558
  }
559
  return code;
8,886,434✔
560
}
561

562
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
4,819,018✔
563
  int32_t code = 0;
4,819,018✔
564
  int32_t lino;
565
  SJson  *pJson = NULL;
4,819,018✔
566

567
  pJson = tjsonParse(pData);
4,819,018✔
568
  if (pJson == NULL) {
4,821,790✔
UNCOV
569
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
570
  }
571

572
  code = tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config);
4,821,790✔
573
  TSDB_CHECK_CODE(code, lino, _exit);
4,822,634✔
574

575
  code = tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state);
4,822,634✔
576
  TSDB_CHECK_CODE(code, lino, _exit);
4,822,422✔
577

578
_exit:
4,822,422✔
579
  tjsonDelete(pJson);
4,822,422✔
580
  return code;
4,820,221✔
581
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc