• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.71
/source/dnode/vnode/src/vnd/vnodeCommit.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "sync.h"
18
#include "vnd.h"
19
#include "vnodeInt.h"
20

21
extern int32_t tsdbPreCommit(STsdb *pTsdb);
22
extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo);
23
extern int32_t tsdbCommitCommit(STsdb *pTsdb);
24
extern int32_t tsdbCommitAbort(STsdb *pTsdb);
25

26
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
27

28
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
29
static int vnodeCommitImpl(SCommitInfo *pInfo);
30

31
#define WAIT_TIME_MILI_SEC 10  // miliseconds
32

UNCOV
33
static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
×
UNCOV
34
  int32_t code = 0;
×
35

UNCOV
36
  if (pVnode->onRecycle == NULL) {
×
UNCOV
37
    if (pVnode->recycleHead == NULL) {
×
38
      vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode));
×
39
      goto _exit;
×
40
    } else {
UNCOV
41
      vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
×
42
             pVnode->recycleHead->id);
43

UNCOV
44
      pVnode->onRecycle = pVnode->recycleHead;
×
UNCOV
45
      if (pVnode->recycleHead == pVnode->recycleTail) {
×
46
        pVnode->recycleHead = pVnode->recycleTail = NULL;
×
47
      } else {
UNCOV
48
        pVnode->recycleHead = pVnode->recycleHead->recycleNext;
×
UNCOV
49
        pVnode->recycleHead->recyclePrev = NULL;
×
50
      }
UNCOV
51
      pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
×
52
    }
53
  }
54

UNCOV
55
  code = vnodeBufPoolRecycle(pVnode->onRecycle);
×
UNCOV
56
  if (code) goto _exit;
×
57

UNCOV
58
_exit:
×
UNCOV
59
  if (code) {
×
60
    vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
×
61
  }
UNCOV
62
  return code;
×
63
}
64
static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
7,161✔
65
  int32_t code = 0;
7,161✔
66
  int32_t lino = 0;
7,161✔
67

68
  (void)taosThreadMutexLock(&pVnode->mutex);
7,161✔
69

70
  int32_t nTry = 0;
7,161✔
71
  for (;;) {
72
    ++nTry;
7,161✔
73

74
    if (pVnode->freeList) {
7,161!
75
      vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
7,161✔
76
             pVnode->freeList->id);
77

78
      pVnode->inUse = pVnode->freeList;
7,161✔
79
      pVnode->inUse->nRef = 1;
7,161✔
80
      pVnode->freeList = pVnode->inUse->freeNext;
7,161✔
81
      pVnode->inUse->freeNext = NULL;
7,161✔
82
      break;
7,161✔
83
    } else {
UNCOV
84
      vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
×
85

UNCOV
86
      code = vnodeTryRecycleBufPool(pVnode);
×
UNCOV
87
      TSDB_CHECK_CODE(code, lino, _exit);
×
88

UNCOV
89
      if (pVnode->freeList == NULL) {
×
90
        vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
×
91

92
        struct timeval  tv;
93
        struct timespec ts;
94
        if (taosGetTimeOfDay(&tv) != 0) {
×
95
          continue;
×
96
        }
97
        ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
×
98
        if (ts.tv_nsec > 999999999l) {
×
99
          ts.tv_sec = tv.tv_sec + 1;
×
100
          ts.tv_nsec -= 1000000000l;
×
101
        } else {
102
          ts.tv_sec = tv.tv_sec;
×
103
        }
104

105
        code = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
×
106
        if (code && code != TSDB_CODE_TIMEOUT_ERROR) {
×
107
          TSDB_CHECK_CODE(code, lino, _exit);
×
108
        }
109
      }
110
    }
111
  }
112

113
_exit:
7,161✔
114
  (void)taosThreadMutexUnlock(&pVnode->mutex);
7,161✔
115
  if (code) {
7,160!
116
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
117
  }
118
  return code;
7,160✔
119
}
120
int vnodeBegin(SVnode *pVnode) {
7,161✔
121
  int32_t code = 0;
7,161✔
122
  int32_t lino = 0;
7,161✔
123

124
  // alloc buffer pool
125
  code = vnodeGetBufPoolToUse(pVnode);
7,161✔
126
  TSDB_CHECK_CODE(code, lino, _exit);
7,160!
127

128
  // begin meta
129
  code = metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL);
7,160✔
130
  TSDB_CHECK_CODE(code, lino, _exit);
7,162!
131

132
  // begin tsdb
133
  code = tsdbBegin(pVnode->pTsdb);
7,162✔
134
  TSDB_CHECK_CODE(code, lino, _exit);
7,161!
135

136
  // begin sma
137
  if (VND_IS_RSMA(pVnode)) {
7,161✔
138
    code = smaBegin(pVnode->pSma);
31✔
139
    TSDB_CHECK_CODE(code, lino, _exit);
31!
140
  }
141

142
_exit:
7,161✔
143
  if (code) {
7,161!
144
    terrno = code;
×
145
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
146
  }
147
  return code;
7,160✔
148
}
149

150
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
4,555,002✔
151
  bool diskAvail = osDataSpaceAvailable();
4,555,002✔
152
  bool needCommit = false;
4,554,992✔
153

154
  (void)taosThreadMutexLock(&pVnode->mutex);
4,554,992✔
155
  if (pVnode->inUse && diskAvail) {
4,556,761!
156
    needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) ||
4,561,751!
157
                 (atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed ||
4,936✔
158
                             pVnode->state.applied - pVnode->state.committed > 4096));
2,432✔
159
  }
160
  vTrace("vgId:%d, should commit:%d, disk available:%d, buffer size:%" PRId64 ", node size:%" PRId64
4,556,761!
161
         ", meta changed:%d"
162
         ", state:[%" PRId64 ",%" PRId64 "]",
163
         TD_VID(pVnode), needCommit, diskAvail, pVnode->inUse ? pVnode->inUse->size : 0,
164
         pVnode->inUse ? pVnode->inUse->node.size : 0, pVnode->pMeta->changed, pVnode->state.applied,
165
         pVnode->state.committed);
166
  (void)taosThreadMutexUnlock(&pVnode->mutex);
4,556,761✔
167
  return needCommit;
4,556,982✔
168
}
169

170
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
6,747✔
171
  int32_t   code = 0;
6,747✔
172
  int32_t   lino;
173
  char      fname[TSDB_FILENAME_LEN];
174
  TdFilePtr pFile = NULL;
6,747✔
175
  char     *data = NULL;
6,747✔
176

177
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
6,747✔
178

179
  code = vnodeEncodeInfo(pInfo, &data);
6,747✔
180
  TSDB_CHECK_CODE(code, lino, _exit);
6,743!
181

182
  // save info to a vnode_tmp.json
183
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
6,743✔
184
  if (pFile == NULL) {
6,734!
185
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
186
  }
187

188
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
6,734!
189
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
190
  }
191

192
  if (taosFsyncFile(pFile) < 0) {
6,740!
193
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
194
  }
195

196
_exit:
6,745✔
197
  if (code) {
6,745!
198
    vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
×
199
  } else {
200
    vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
6,745!
201
          pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
202
  }
203
  if (taosCloseFile(&pFile) != 0) {
6,747!
204
    vError("vgId:%d, failed to close file", pInfo->config.vgId);
×
205
  }
206
  taosMemoryFree(data);
6,748✔
207
  return code;
6,748✔
208
}
209

210
int vnodeCommitInfo(const char *dir) {
6,747✔
211
  char fname[TSDB_FILENAME_LEN];
212
  char tfname[TSDB_FILENAME_LEN];
213

214
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
6,747✔
215
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
6,747✔
216

217
  int32_t code = taosRenameFile(tfname, fname);
6,747✔
218
  if (code < 0) {
6,744!
219
    return code;
×
220
  }
221

222
  vInfo("vnode info is committed, dir:%s", dir);
6,744!
223
  return 0;
6,748✔
224
}
225

226
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
10,379✔
227
  int32_t   code = 0;
10,379✔
228
  int32_t   lino;
229
  char      fname[TSDB_FILENAME_LEN];
230
  TdFilePtr pFile = NULL;
10,379✔
231
  char     *pData = NULL;
10,379✔
232
  int64_t   size;
233

234
  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
10,379✔
235

236
  // read info
237
  pFile = taosOpenFile(fname, TD_FILE_READ);
10,379✔
238
  if (pFile == NULL) {
10,391✔
239
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
4,704!
240
  }
241

242
  code = taosFStatFile(pFile, &size, NULL);
5,687✔
243
  TSDB_CHECK_CODE(code, lino, _exit);
5,681!
244

245
  pData = taosMemoryMalloc(size + 1);
5,681✔
246
  if (pData == NULL) {
5,689!
247
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
248
  }
249

250
  if (taosReadFile(pFile, pData, size) < 0) {
5,689!
251
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
252
  }
253

254
  pData[size] = '\0';
5,687✔
255

256
  // decode info
257
  code = vnodeDecodeInfo(pData, pInfo);
5,687✔
258
  TSDB_CHECK_CODE(code, lino, _exit);
5,686!
259

260
  pInfo->config.walCfg.committed = pInfo->state.committed;
5,686✔
261
_exit:
10,390✔
262
  if (code) {
10,390✔
263
    if (pFile) {
4,704!
264
      vError("vgId:%d %s failed at %s:%d since %s", pInfo->config.vgId, __func__, __FILE__, lino, tstrerror(code));
×
265
    }
266
  }
267
  taosMemoryFree(pData);
10,390✔
268
  if (taosCloseFile(&pFile) != 0) {
10,393!
269
    vError("vgId:%d, failed to close file", pInfo->config.vgId);
×
270
  }
271
  return code;
10,397✔
272
}
273

274
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
1,753✔
275
  int32_t code = 0;
1,753✔
276
  int32_t lino = 0;
1,753✔
277
  char    dir[TSDB_FILENAME_LEN] = {0};
1,753✔
278
  int64_t lastCommitted = pInfo->info.state.committed;
1,753✔
279

280
  // wait last commit task
281
  vnodeAWait(&pVnode->commitTask);
1,753✔
282

283
  code = syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg);
1,754✔
284
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
285

286
  pVnode->state.commitTerm = pVnode->state.applyTerm;
1,756✔
287

288
  pInfo->info.config = pVnode->config;
1,756✔
289
  pInfo->info.state.committed = pVnode->state.applied;
1,756✔
290
  pInfo->info.state.commitTerm = pVnode->state.applyTerm;
1,756✔
291
  pInfo->info.state.commitID = ++pVnode->state.commitID;
1,756✔
292
  pInfo->pVnode = pVnode;
1,756✔
293
  pInfo->txn = metaGetTxn(pVnode->pMeta);
1,756✔
294

295
  // save info
296
  vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
1,756✔
297

298
  vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
1,756✔
299
  code = vnodeSaveInfo(dir, &pInfo->info);
1,756✔
300
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
301

302
  code = tsdbPreCommit(pVnode->pTsdb);
1,756✔
303
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
304

305
  code = metaPrepareAsyncCommit(pVnode->pMeta);
1,756✔
306
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
307

308
  code = smaPrepareAsyncCommit(pVnode->pSma);
1,756✔
309
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
310

311
  (void)taosThreadMutexLock(&pVnode->mutex);
1,756✔
312
  pVnode->onCommit = pVnode->inUse;
1,756✔
313
  pVnode->inUse = NULL;
1,756✔
314
  (void)taosThreadMutexUnlock(&pVnode->mutex);
1,756✔
315

316
_exit:
1,756✔
317
  if (code) {
1,756!
318
    vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
×
319
           tstrerror(code), pVnode->state.commitID);
320
  } else {
321
    vDebug("vgId:%d, %s done, commit id:%" PRId64, TD_VID(pVnode), __func__, pInfo->info.state.commitID);
1,756✔
322
  }
323

324
  return code;
1,756✔
325
}
326
static void vnodeReturnBufPool(SVnode *pVnode) {
1,756✔
327
  (void)taosThreadMutexLock(&pVnode->mutex);
1,756✔
328

329
  SVBufPool *pPool = pVnode->onCommit;
1,756✔
330
  int32_t    nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
1,756✔
331

332
  pVnode->onCommit = NULL;
1,756✔
333
  if (nRef == 0) {
1,756✔
334
    vnodeBufPoolAddToFreeList(pPool);
1,666✔
335
  } else if (nRef > 0) {
90!
336
    vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
90✔
337

338
    if (pVnode->recycleTail == NULL) {
90!
339
      pPool->recyclePrev = pPool->recycleNext = NULL;
90✔
340
      pVnode->recycleHead = pVnode->recycleTail = pPool;
90✔
341
    } else {
UNCOV
342
      pPool->recyclePrev = pVnode->recycleTail;
×
UNCOV
343
      pPool->recycleNext = NULL;
×
UNCOV
344
      pVnode->recycleTail->recycleNext = pPool;
×
UNCOV
345
      pVnode->recycleTail = pPool;
×
346
    }
347
  } else {
348
    vError("vgId:%d, buffer pool %p of id %d nRef:%d", TD_VID(pVnode), pPool, pPool->id, nRef);
×
349
  }
350

351
  (void)taosThreadMutexUnlock(&pVnode->mutex);
1,756✔
352
}
1,756✔
353
static int32_t vnodeCommit(void *arg) {
1,756✔
354
  int32_t code = 0;
1,756✔
355

356
  SCommitInfo *pInfo = (SCommitInfo *)arg;
1,756✔
357
  SVnode      *pVnode = pInfo->pVnode;
1,756✔
358

359
  // commit
360
  if ((code = vnodeCommitImpl(pInfo))) {
1,756!
361
    vFatal("vgId:%d, failed to commit vnode since %s", TD_VID(pVnode), terrstr());
×
362
    taosMsleep(100);
×
363
    exit(EXIT_FAILURE);
×
364
    goto _exit;
365
  }
366

367
  vnodeReturnBufPool(pVnode);
1,756✔
368

369
_exit:
1,756✔
370
  taosMemoryFree(arg);
1,756✔
371
  return code;
1,756✔
372
}
373

374
static void vnodeCommitCancel(void *arg) { taosMemoryFree(arg); }
×
375

376
int vnodeAsyncCommit(SVnode *pVnode) {
1,756✔
377
  int32_t code = 0;
1,756✔
378
  int32_t lino = 0;
1,756✔
379

380
  SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
1,756✔
381
  if (NULL == pInfo) {
1,753!
382
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
383
  }
384

385
  // prepare to commit
386
  code = vnodePrepareCommit(pVnode, pInfo);
1,753✔
387
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
388

389
  // schedule the task
390
  code =
391
      vnodeAsync(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask);
1,756✔
392
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
393

394
_exit:
1,756✔
395
  if (code) {
1,756!
396
    taosMemoryFree(pInfo);
×
397
    vError("vgId:%d %s failed at line %d since %s" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code));
×
398
  } else {
399
    vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
1,756!
400
          pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
401
  }
402
  return code;
1,756✔
403
}
404

405
int32_t vnodeSyncCommit(SVnode *pVnode) {
1✔
406
  int32_t lino;
407
  int32_t code = vnodeAsyncCommit(pVnode);
1✔
408
  TSDB_CHECK_CODE(code, lino, _exit);
1!
409
  vnodeAWait(&pVnode->commitTask);
1✔
410

411
_exit:
1✔
412
  if (code) {
1!
413
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
414
  } else {
415
    vInfo("vgId:%d, sync commit end", TD_VID(pVnode));
1!
416
  }
417

418
  return code;
1✔
419
}
420

421
static int vnodeCommitImpl(SCommitInfo *pInfo) {
1,756✔
422
  int32_t code = 0;
1,756✔
423
  int32_t lino = 0;
1,756✔
424

425
  char    dir[TSDB_FILENAME_LEN] = {0};
1,756✔
426
  SVnode *pVnode = pInfo->pVnode;
1,756✔
427

428
  vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
1,756!
429
        pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
430

431
  // persist wal before starting
432
  if ((code = walPersist(pVnode->pWal)) < 0) {
1,756!
433
    vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), tstrerror(code));
×
434
    return code;
×
435
  }
436

437
  vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
1,756✔
438

439
  code = syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
1,756✔
440
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
441

442
  code = tsdbCommitBegin(pVnode->pTsdb, pInfo);
1,756✔
443
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
444

445
  if (!TSDB_CACHE_NO(pVnode->config)) {
1,756✔
446
    code = tsdbCacheCommit(pVnode->pTsdb);
669✔
447
    TSDB_CHECK_CODE(code, lino, _exit);
669!
448
  }
449

450
  if (VND_IS_RSMA(pVnode)) {
1,756✔
451
    code = smaCommit(pVnode->pSma, pInfo);
14✔
452
    TSDB_CHECK_CODE(code, lino, _exit);
14!
453
  }
454

455
  // commit info
456
  code = vnodeCommitInfo(dir);
1,756✔
457
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
458

459
  code = tsdbCommitCommit(pVnode->pTsdb);
1,756✔
460
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
461

462
  if (VND_IS_RSMA(pVnode)) {
1,756✔
463
    code = smaFinishCommit(pVnode->pSma);
14✔
464
    TSDB_CHECK_CODE(code, lino, _exit);
14!
465
  }
466

467
  code = metaFinishCommit(pVnode->pMeta, pInfo->txn);
1,756✔
468
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
469

470
  pVnode->state.committed = pInfo->info.state.committed;
1,756✔
471

472
  if (smaPostCommit(pVnode->pSma) < 0) {
1,756!
473
    vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
×
474
    return -1;
×
475
  }
476

477
  code = syncEndSnapshot(pVnode->sync);
1,755✔
478
  TSDB_CHECK_CODE(code, lino, _exit);
1,756!
479

480
_exit:
1,756✔
481
  if (code) {
1,756!
482
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
×
483
  } else {
484
    vInfo("vgId:%d, commit end", TD_VID(pVnode));
1,756!
485
  }
486
  return code;
1,756✔
487
}
488

489
bool vnodeShouldRollback(SVnode *pVnode) {
5,406✔
490
  char    tFName[TSDB_FILENAME_LEN] = {0};
5,406✔
491
  int32_t offset = 0;
5,406✔
492

493
  vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
5,406✔
494
  offset = strlen(tFName);
5,406✔
495
  snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
5,406✔
496

497
  return taosCheckExistFile(tFName);
5,406✔
498
}
499

500
void vnodeRollback(SVnode *pVnode) {
×
501
  char    tFName[TSDB_FILENAME_LEN] = {0};
×
502
  int32_t offset = 0;
×
503

504
  vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
×
505
  offset = strlen(tFName);
×
506
  snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
×
507

508
  if (taosRemoveFile(tFName) != 0) {
×
509
    vError("vgId:%d, failed to remove file %s since %s", TD_VID(pVnode), tFName, tstrerror(terrno));
×
510
  }
511
}
×
512

513
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
6,744✔
514
  const SVState *pState = (SVState *)pObj;
6,744✔
515

516
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit version", pState->committed));
6,744!
517
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID));
6,746!
518
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm));
6,746✔
519

520
  return 0;
6,744✔
521
}
522

523
static int vnodeDecodeState(const SJson *pJson, void *pObj) {
5,688✔
524
  SVState *pState = (SVState *)pObj;
5,688✔
525

526
  int32_t code;
527
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
5,688✔
528
  if (code) return code;
5,693!
529
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
5,693✔
530
  if (code) return code;
5,691!
531
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
5,691✔
532
  if (code) return code;
5,693!
533

534
  return 0;
5,693✔
535
}
536

537
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
6,746✔
538
  int32_t code = 0;
6,746✔
539
  int32_t lino;
540
  SJson  *pJson = NULL;
6,746✔
541
  char   *pData = NULL;
6,746✔
542

543
  pJson = tjsonCreateObject();
6,746✔
544
  if (pJson == NULL) {
6,744!
545
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
546
  }
547

548
  code = tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config);
6,744✔
549
  TSDB_CHECK_CODE(code, lino, _exit);
6,747!
550

551
  code = tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state);
6,747✔
552
  TSDB_CHECK_CODE(code, lino, _exit);
6,744!
553

554
  pData = tjsonToString(pJson);
6,744✔
555
  if (pData == NULL) {
6,740!
556
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
557
  }
558

559
  tjsonDelete(pJson);
6,740✔
560

561
_exit:
6,746✔
562
  if (code) {
6,746!
563
    tjsonDelete(pJson);
×
564
    *ppData = NULL;
×
565
  } else {
566
    *ppData = pData;
6,746✔
567
  }
568
  return code;
6,746✔
569
}
570

571
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
5,685✔
572
  int32_t code = 0;
5,685✔
573
  int32_t lino;
574
  SJson  *pJson = NULL;
5,685✔
575

576
  pJson = tjsonParse(pData);
5,685✔
577
  if (pJson == NULL) {
5,693!
578
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit);
×
579
  }
580

581
  code = tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config);
5,693✔
582
  TSDB_CHECK_CODE(code, lino, _exit);
5,690!
583

584
  code = tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state);
5,690✔
585
  TSDB_CHECK_CODE(code, lino, _exit);
5,693!
586

587
_exit:
5,693✔
588
  tjsonDelete(pJson);
5,693✔
589
  return code;
5,694✔
590
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc