• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4541

19 Jul 2025 01:13AM UTC coverage: 56.753% (-1.6%) from 58.31%
#4541

push

travis-ci

web-flow
fix: subquery memleak (#32024)

124299 of 282344 branches covered (44.02%)

Branch coverage included in aggregate %.

181106 of 255787 relevant lines covered (70.8%)

24937406.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.69
/source/dnode/vnode/src/vnd/vnodeCommit.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "sync.h"
18
#include "vnd.h"
19
#include "vnodeInt.h"
20

21
extern int32_t tsdbPreCommit(STsdb *pTsdb);
22
extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo);
23
extern int32_t tsdbCommitCommit(STsdb *pTsdb);
24
extern int32_t tsdbCommitAbort(STsdb *pTsdb);
25

26
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
27

28
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
29
static int vnodeCommitImpl(SCommitInfo *pInfo);
30

31
#define WAIT_TIME_MILI_SEC 10  // miliseconds
32

33
static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
×
34
  int32_t code = 0;
×
35

36
  if (pVnode->onRecycle == NULL) {
×
37
    if (pVnode->recycleHead == NULL) {
×
38
      vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode));
×
39
      goto _exit;
×
40
    } else {
41
      vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
×
42
             pVnode->recycleHead->id);
43

44
      pVnode->onRecycle = pVnode->recycleHead;
×
45
      if (pVnode->recycleHead == pVnode->recycleTail) {
×
46
        pVnode->recycleHead = pVnode->recycleTail = NULL;
×
47
      } else {
48
        pVnode->recycleHead = pVnode->recycleHead->recycleNext;
×
49
        pVnode->recycleHead->recyclePrev = NULL;
×
50
      }
51
      pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
×
52
    }
53
  }
54

55
  code = vnodeBufPoolRecycle(pVnode->onRecycle);
×
56
  if (code) goto _exit;
×
57

58
_exit:
×
59
  if (code) {
×
60
    vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
×
61
  }
62
  return code;
×
63
}
64
static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
49,812✔
65
  int32_t code = 0;
49,812✔
66
  int32_t lino = 0;
49,812✔
67

68
  (void)taosThreadMutexLock(&pVnode->mutex);
49,812✔
69

70
  int32_t nTry = 0;
49,812✔
71
  for (;;) {
72
    ++nTry;
49,812✔
73

74
    if (pVnode->freeList) {
49,812!
75
      vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
49,812✔
76
             pVnode->freeList->id);
77

78
      pVnode->inUse = pVnode->freeList;
49,812✔
79
      pVnode->inUse->nRef = 1;
49,812✔
80
      pVnode->freeList = pVnode->inUse->freeNext;
49,812✔
81
      pVnode->inUse->freeNext = NULL;
49,812✔
82
      break;
49,812✔
83
    } else {
84
      vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
×
85

86
      code = vnodeTryRecycleBufPool(pVnode);
×
87
      TSDB_CHECK_CODE(code, lino, _exit);
×
88

89
      if (pVnode->freeList == NULL) {
×
90
        vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
×
91

92
        struct timeval  tv;
93
        struct timespec ts;
94
        if (taosGetTimeOfDay(&tv) != 0) {
×
95
          continue;
×
96
        }
97
        ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
×
98
        if (ts.tv_nsec > 999999999l) {
×
99
          ts.tv_sec = tv.tv_sec + 1;
×
100
          ts.tv_nsec -= 1000000000l;
×
101
        } else {
102
          ts.tv_sec = tv.tv_sec;
×
103
        }
104

105
        code = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
×
106
        // ignore timeout error and retry
107
        if (code == TSDB_CODE_TIMEOUT_ERROR) {
×
108
          code = TSDB_CODE_SUCCESS;
×
109
        }
110
        TSDB_CHECK_CODE(code, lino, _exit);
×
111
      }
112
    }
113
  }
114

115
_exit:
49,812✔
116
  (void)taosThreadMutexUnlock(&pVnode->mutex);
49,812✔
117
  if (code) {
49,812!
118
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
119
  }
120
  return code;
49,812✔
121
}
122
int vnodeBegin(SVnode *pVnode) {
49,812✔
123
  int32_t code = 0;
49,812✔
124
  int32_t lino = 0;
49,812✔
125

126
  // alloc buffer pool
127
  code = vnodeGetBufPoolToUse(pVnode);
49,812✔
128
  TSDB_CHECK_CODE(code, lino, _exit);
49,812!
129

130
  // begin meta
131
  code = metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL);
49,812✔
132
  TSDB_CHECK_CODE(code, lino, _exit);
49,812!
133

134
  // begin tsdb
135
  code = tsdbBegin(pVnode->pTsdb);
49,812✔
136
  TSDB_CHECK_CODE(code, lino, _exit);
49,806!
137

138
  // begin sma
139
  if (VND_IS_RSMA(pVnode)) {
49,806✔
140
    code = smaBegin(pVnode->pSma);
19✔
141
    TSDB_CHECK_CODE(code, lino, _exit);
19!
142
  }
143

144
_exit:
49,806✔
145
  if (code) {
49,806!
146
    terrno = code;
×
147
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
148
  }
149
  return code;
49,810✔
150
}
151

152
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
19,383,649✔
153
  bool diskAvail = osDataSpaceAvailable();
19,383,649✔
154
  bool needCommit = false;
19,383,705✔
155

156
  (void)taosThreadMutexLock(&pVnode->mutex);
19,383,705✔
157
  if (pVnode->inUse && diskAvail) {
19,384,448!
158
    needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) ||
19,396,669✔
159
                 (atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed ||
12,227✔
160
                             pVnode->state.applied - pVnode->state.committed > 4096));
4,293✔
161
  }
162
  vTrace("vgId:%d, should commit:%d, disk available:%d, buffer size:%" PRId64 ", node size:%" PRId64
19,384,448!
163
         ", meta changed:%d"
164
         ", state:[%" PRId64 ",%" PRId64 "]",
165
         TD_VID(pVnode), needCommit, diskAvail, pVnode->inUse ? pVnode->inUse->size : 0,
166
         pVnode->inUse ? pVnode->inUse->node.size : 0, pVnode->pMeta->changed, pVnode->state.applied,
167
         pVnode->state.committed);
168
  (void)taosThreadMutexUnlock(&pVnode->mutex);
19,384,448✔
169
  return needCommit;
19,384,512✔
170
}
171

172
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
48,538✔
173
  int32_t   code = 0;
48,538✔
174
  int32_t   lino;
175
  char      fname[TSDB_FILENAME_LEN];
176
  TdFilePtr pFile = NULL;
48,538✔
177
  char     *data = NULL;
48,538✔
178

179
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
48,538✔
180

181
  code = vnodeEncodeInfo(pInfo, &data);
48,538✔
182
  TSDB_CHECK_CODE(code, lino, _exit);
48,523!
183

184
  // save info to a vnode_tmp.json
185
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
48,523✔
186
  if (pFile == NULL) {
48,368!
187
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
188
  }
189

190
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
48,368!
191
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
192
  }
193

194
  if (taosFsyncFile(pFile) < 0) {
48,432!
195
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
196
  }
197

198
_exit:
48,556✔
199
  if (code) {
48,556!
200
    vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
×
201
  } else {
202
    vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
48,556✔
203
          pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
204
  }
205
  if (taosCloseFile(&pFile) != 0) {
48,560!
206
    vError("vgId:%d, failed to close file", pInfo->config.vgId);
×
207
  }
208
  taosMemoryFree(data);
48,562!
209
  TAOS_RETURN(code);
48,562✔
210
}
211

212
int vnodeCommitInfo(const char *dir) {
48,561✔
213
  char fname[TSDB_FILENAME_LEN];
214
  char tfname[TSDB_FILENAME_LEN];
215

216
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
48,561✔
217
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
48,561✔
218

219
  int32_t code = taosRenameFile(tfname, fname);
48,561✔
220
  if (code < 0) {
48,554!
221
    return code;
×
222
  }
223

224
  vInfo("vnode info is committed, dir:%s", dir);
48,554✔
225
  return 0;
48,562✔
226
}
227

228
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
28,768✔
229
  int32_t   code = 0;
28,768✔
230
  int32_t   lino;
231
  char      fname[TSDB_FILENAME_LEN];
232
  TdFilePtr pFile = NULL;
28,768✔
233
  char     *pData = NULL;
28,768✔
234
  int64_t   size;
235

236
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
28,768✔
237

238
  // read info
239
  pFile = taosOpenFile(fname, TD_FILE_READ);
28,768✔
240
  if (pFile == NULL) {
28,771✔
241
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
11,500!
242
  }
243

244
  code = taosFStatFile(pFile, &size, NULL);
17,271✔
245
  TSDB_CHECK_CODE(code, lino, _exit);
17,253!
246

247
  pData = taosMemoryMalloc(size + 1);
17,253!
248
  if (pData == NULL) {
17,277!
249
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
250
  }
251

252
  if (taosReadFile(pFile, pData, size) < 0) {
17,277!
253
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
254
  }
255

256
  pData[size] = '\0';
17,261✔
257

258
  // decode info
259
  code = vnodeDecodeInfo(pData, pInfo);
17,261✔
260
  TSDB_CHECK_CODE(code, lino, _exit);
17,262!
261

262
  pInfo->config.walCfg.committed = pInfo->state.committed;
17,262✔
263
_exit:
28,762✔
264
  if (code) {
28,762✔
265
    vError("vgId:%d %s failed at %s:%d since %s, file:%s", pInfo->config.vgId, __func__, __FILE__, lino,
11,500✔
266
           tstrerror(code), fname);
267
  }
268
  taosMemoryFree(pData);
28,769!
269
  if (taosCloseFile(&pFile) != 0) {
28,773!
270
    vError("vgId:%d, failed to close file", pInfo->config.vgId);
×
271
  }
272
  return code;
28,781✔
273
}
274

275
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
34,734✔
276
  int32_t code = 0;
34,734✔
277
  int32_t lino = 0;
34,734✔
278
  char    dir[TSDB_FILENAME_LEN] = {0};
34,734✔
279
  int64_t lastCommitted = pInfo->info.state.committed;
34,734✔
280

281
  // wait last commit task
282
  vnodeAWait(&pVnode->commitTask);
34,734✔
283

284
  code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg);
34,739✔
285
  TSDB_CHECK_CODE(code, lino, _exit);
34,745!
286

287
  pVnode->state.commitTerm = pVnode->state.applyTerm;
34,745✔
288

289
  pInfo->info.config = pVnode->config;
34,745✔
290
  pInfo->info.state.committed = pVnode->state.applied;
34,745✔
291
  pInfo->info.state.commitTerm = pVnode->state.applyTerm;
34,745✔
292
  pInfo->info.state.commitID = ++pVnode->state.commitID;
34,745✔
293
  pInfo->pVnode = pVnode;
34,745✔
294
  pInfo->txn = metaGetTxn(pVnode->pMeta);
34,745✔
295

296
  // save info
297
  vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
34,745✔
298

299
  vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
34,738✔
300
  code = vnodeSaveInfo(dir, &pInfo->info);
34,738✔
301
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
302

303
  code = tsdbPreCommit(pVnode->pTsdb);
34,751✔
304
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
305

306
  code = metaPrepareAsyncCommit(pVnode->pMeta);
34,751✔
307
  TSDB_CHECK_CODE(code, lino, _exit);
34,750!
308

309
  code = smaPrepareAsyncCommit(pVnode->pSma);
34,750✔
310
  TSDB_CHECK_CODE(code, lino, _exit);
34,749!
311

312
  (void)taosThreadMutexLock(&pVnode->mutex);
34,749✔
313
  pVnode->onCommit = pVnode->inUse;
34,750✔
314
  pVnode->inUse = NULL;
34,750✔
315
  (void)taosThreadMutexUnlock(&pVnode->mutex);
34,750✔
316

317
_exit:
34,750✔
318
  if (code) {
34,750!
319
    vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
×
320
           tstrerror(code), pVnode->state.commitID);
321
  } else {
322
    vDebug("vgId:%d, %s done, commit id:%" PRId64, TD_VID(pVnode), __func__, pInfo->info.state.commitID);
34,750✔
323
  }
324

325
  return code;
34,750✔
326
}
327
static void vnodeReturnBufPool(SVnode *pVnode) {
34,751✔
328
  (void)taosThreadMutexLock(&pVnode->mutex);
34,751✔
329

330
  SVBufPool *pPool = pVnode->onCommit;
34,751✔
331
  int32_t    nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
34,751✔
332

333
  pVnode->onCommit = NULL;
34,749✔
334
  if (nRef == 0) {
34,749✔
335
    vnodeBufPoolAddToFreeList(pPool);
34,389✔
336
  } else if (nRef > 0) {
360!
337
    vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
360✔
338

339
    if (pVnode->recycleTail == NULL) {
360✔
340
      pPool->recyclePrev = pPool->recycleNext = NULL;
359✔
341
      pVnode->recycleHead = pVnode->recycleTail = pPool;
359✔
342
    } else {
343
      pPool->recyclePrev = pVnode->recycleTail;
1✔
344
      pPool->recycleNext = NULL;
1✔
345
      pVnode->recycleTail->recycleNext = pPool;
1✔
346
      pVnode->recycleTail = pPool;
1✔
347
    }
348
  } else {
349
    vError("vgId:%d, buffer pool %p of id %d nRef:%d", TD_VID(pVnode), pPool, pPool->id, nRef);
×
350
  }
351

352
  (void)taosThreadMutexUnlock(&pVnode->mutex);
34,750✔
353
}
34,751✔
354
static int32_t vnodeCommit(void *arg) {
34,751✔
355
  int32_t code = 0;
34,751✔
356

357
  SCommitInfo *pInfo = (SCommitInfo *)arg;
34,751✔
358
  SVnode      *pVnode = pInfo->pVnode;
34,751✔
359

360
  // commit
361
  METRICS_TIMING_BLOCK(pVnode->writeMetrics.commit_time, METRIC_LEVEL_HIGH, {
34,751!
362
    if ((code = vnodeCommitImpl(pInfo))) {
363
      vFatal("vgId:%d, failed to commit vnode since %s", TD_VID(pVnode), terrstr());
364
      taosMsleep(100);
365
      exit(EXIT_FAILURE);
366
      goto _exit;
367
    }
368
  });
369

370
  METRICS_UPDATE(pVnode->writeMetrics.commit_count, METRIC_LEVEL_HIGH, 1);
34,751!
371

372
  vnodeReturnBufPool(pVnode);
34,751✔
373

374
_exit:
34,751✔
375
  taosMemoryFree(arg);
34,751!
376
  return code;
34,751✔
377
}
378

379
static int32_t vnodeBseCommit(void *arg) {
×
380
  int32_t      code = 0;
×
381
  SCommitInfo *pInfo = (SCommitInfo *)arg;
×
382
  SVnode      *pVnode = pInfo->pVnode;
×
383

384
  code = bseCommit(pVnode->pBse);
×
385
_exit:
×
386
  taosMemoryFree(arg);
×
387
  return code;
×
388
}
389

390
static void vnodeCommitCancel(void *arg) { taosMemoryFree(arg); }
×
391

392
int vnodeAsyncCommit(SVnode *pVnode) {
34,749✔
393
  int32_t code = 0;
34,749✔
394
  int32_t lino = 0;
34,749✔
395

396
  SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
34,749!
397
  if (NULL == pInfo) {
34,726!
398
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
399
  }
400
  // SCommitInfo *pBseCommitInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
401
  // if (NULL == pInfo) {
402
  //   TSDB_CHECK_CODE(code = terrno, lino, _exit);
403
  // }
404
  // pBseCommitInfo->pVnode = pVnode;
405

406
  // prepare to commit
407
  code = vnodePrepareCommit(pVnode, pInfo);
34,726✔
408
  TSDB_CHECK_CODE(code, lino, _exit);
34,748!
409

410
  // schedule the task
411
  code = vnodeAsync(COMMIT_TASK_ASYNC, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask);
34,748✔
412
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
413

414
_exit:
34,751✔
415
  if (code) {
34,751!
416
    taosMemoryFree(pInfo);
×
417
    // taosMemoryFree(pBseCommitInfo);
418
    vError("vgId:%d %s failed at line %d since %s" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code));
×
419
  } else {
420
    vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
34,751✔
421
          pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
422
  }
423
  return code;
34,751✔
424
}
425

426
int32_t vnodeSyncCommit(SVnode *pVnode) {
146✔
427
  int32_t lino;
428
  int32_t code = vnodeAsyncCommit(pVnode);
146✔
429
  TSDB_CHECK_CODE(code, lino, _exit);
146!
430
  vnodeAWait(&pVnode->commitTask);
146✔
431

432
_exit:
146✔
433
  if (code) {
146!
434
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
435
  } else {
436
    vInfo("vgId:%d, sync commit end", TD_VID(pVnode));
146!
437
  }
438

439
  return code;
146✔
440
}
441

442
static int vnodeCommitImpl(SCommitInfo *pInfo) {
34,750✔
443
  int32_t code = 0;
34,750✔
444
  int32_t lino = 0;
34,750✔
445

446
  char    dir[TSDB_FILENAME_LEN] = {0};
34,750✔
447
  SVnode *pVnode = pInfo->pVnode;
34,750✔
448

449
  vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
34,750✔
450
        pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
451

452
  // persist wal before starting
453
  if ((code = walPersist(pVnode->pWal)) < 0) {
34,751!
454
    vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), tstrerror(code));
×
455
    return code;
×
456
  }
457

458
  vnodeGetPrimaryPath(pVnode, false, dir, TSDB_FILENAME_LEN);
34,751✔
459

460
  code = syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
34,751✔
461
  TSDB_CHECK_CODE(code, lino, _exit);
34,750!
462

463
  code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
34,750✔
464
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
465

466
  if (!TSDB_CACHE_NO(pVnode->config)) {
34,751✔
467
    METRICS_TIMING_BLOCK(pVnode->writeMetrics.last_cache_commit_time, METRIC_LEVEL_HIGH,
8,279!
468
                         { code = tsdbCacheCommit(pVnode->pTsdb); });
469
    METRICS_UPDATE(pVnode->writeMetrics.last_cache_commit_count, METRIC_LEVEL_HIGH, 1);
8,273!
470
    TSDB_CHECK_CODE(code, lino, _exit);
8,273!
471
  }
472

473
  if (VND_IS_RSMA(pVnode)) {
34,745✔
474
    code = smaCommit(pVnode->pSma, pInfo);
10✔
475
    TSDB_CHECK_CODE(code, lino, _exit);
10!
476
  }
477
  // blob storage engine commit
478
  code = bseCommit(pVnode->pBse);
34,745✔
479
  // commit info
480
  code = vnodeCommitInfo(dir);
34,751✔
481
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
482

483
  code = tsdbCommitCommit(pVnode->pTsdb);
34,751✔
484
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
485

486
  if (VND_IS_RSMA(pVnode)) {
34,751✔
487
    code = smaFinishCommit(pVnode->pSma);
10✔
488
    TSDB_CHECK_CODE(code, lino, _exit);
10!
489
  }
490

491
  code = metaFinishCommit(pVnode->pMeta, pInfo->txn);
34,751✔
492
  TSDB_CHECK_CODE(code, lino, _exit);
34,749!
493

494
  pVnode->state.committed = pInfo->info.state.committed;
34,749✔
495

496
  if (smaPostCommit(pVnode->pSma) < 0) {
34,749!
497
    vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
×
498
    return -1;
×
499
  }
500

501
  code = syncEndSnapshot(pVnode->sync);
34,750✔
502
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
503

504
  code = tqCommitOffset(pVnode->pTq);
34,751✔
505
  TSDB_CHECK_CODE(code, lino, _exit);
34,751!
506
  
507
_exit:
34,751✔
508
  if (code) {
34,751!
509
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
510
  } else {
511
    vInfo("vgId:%d, commit end", TD_VID(pVnode));
34,751✔
512
  }
513
  return code;
34,751✔
514
}
515

516
bool vnodeShouldRollback(SVnode *pVnode) {
15,061✔
517
  char    tFName[TSDB_FILENAME_LEN] = {0};
15,061✔
518
  int32_t offset = 0;
15,061✔
519

520
  vnodeGetPrimaryPath(pVnode, false, tFName, TSDB_FILENAME_LEN);
15,061✔
521
  offset = strlen(tFName);
15,061✔
522
  snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
15,061✔
523

524
  return taosCheckExistFile(tFName);
15,061✔
525
}
526

527
void vnodeRollback(SVnode *pVnode) {
×
528
  char    tFName[TSDB_FILENAME_LEN] = {0};
×
529
  int32_t offset = 0;
×
530

531
  vnodeGetPrimaryPath(pVnode, false, tFName, TSDB_FILENAME_LEN);
×
532
  offset = strlen(tFName);
×
533
  snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
×
534

535
  if (taosRemoveFile(tFName) != 0) {
×
536
    vError("vgId:%d, failed to remove file %s since %s", TD_VID(pVnode), tFName, tstrerror(terrno));
×
537
  }
538
}
×
539

540
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
48,505✔
541
  const SVState *pState = (SVState *)pObj;
48,505✔
542

543
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit version", pState->committed));
48,505!
544
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID));
48,553!
545
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm));
48,551✔
546

547
  return 0;
48,548✔
548
}
549

550
static int vnodeDecodeState(const SJson *pJson, void *pObj) {
17,343✔
551
  SVState *pState = (SVState *)pObj;
17,343✔
552

553
  int32_t code;
554
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
17,343✔
555
  if (code) return code;
17,365!
556
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
17,365✔
557
  if (code) return code;
17,368!
558
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
17,368✔
559
  if (code) return code;
17,368!
560

561
  return 0;
17,368✔
562
}
563

564
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
48,529✔
565
  int32_t code = 0;
48,529✔
566
  int32_t lino;
567
  SJson  *pJson = NULL;
48,529✔
568
  char   *pData = NULL;
48,529✔
569

570
  pJson = tjsonCreateObject();
48,529✔
571
  if (pJson == NULL) {
48,538!
572
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
573
  }
574

575
  code = tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config);
48,538✔
576
  TSDB_CHECK_CODE(code, lino, _exit);
48,542!
577

578
  code = tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state);
48,542✔
579
  TSDB_CHECK_CODE(code, lino, _exit);
48,549!
580

581
  pData = tjsonToString(pJson);
48,549✔
582
  if (pData == NULL) {
48,546!
583
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
584
  }
585

586
  tjsonDelete(pJson);
48,546✔
587

588
_exit:
48,542✔
589
  if (code) {
48,542!
590
    tjsonDelete(pJson);
×
591
    *ppData = NULL;
×
592
  } else {
593
    *ppData = pData;
48,542✔
594
  }
595
  return code;
48,542✔
596
}
597

598
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
17,288✔
599
  int32_t code = 0;
17,288✔
600
  int32_t lino;
601
  SJson  *pJson = NULL;
17,288✔
602

603
  pJson = tjsonParse(pData);
17,288✔
604
  if (pJson == NULL) {
17,336!
605
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
606
  }
607

608
  code = tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config);
17,336✔
609
  TSDB_CHECK_CODE(code, lino, _exit);
17,352!
610

611
  code = tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state);
17,352✔
612
  TSDB_CHECK_CODE(code, lino, _exit);
17,369!
613

614
_exit:
17,369✔
615
  tjsonDelete(pJson);
17,369✔
616
  return code;
17,366✔
617
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc