• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.91
/source/dnode/vnode/src/vnd/vnodeSync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "sync.h"
18
#include "tq.h"
19
#include "tqCommon.h"
20
#include "tsdb.h"
21
#include "vnd.h"
22

23
#define BATCH_ENABLE 0
24

25
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
56✔
26

UNCOV
27
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
×
UNCOV
28
  const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
29
  vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
×
30
          TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
UNCOV
31
  if (tsem_wait(&pVnode->syncSem) != 0) {
×
32
    vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
×
33
  }
UNCOV
34
}
×
35

36
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
20✔
37
  if (vnodeIsMsgBlock(pMsg->msgType)) {
20!
UNCOV
38
    const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
39
    (void)taosThreadMutexLock(&pVnode->lock);
×
UNCOV
40
    if (pVnode->blocked) {
×
UNCOV
41
      vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
×
42
              TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
UNCOV
43
      pVnode->blocked = false;
×
UNCOV
44
      pVnode->blockSec = 0;
×
UNCOV
45
      pVnode->blockSeq = 0;
×
UNCOV
46
      if (tsem_post(&pVnode->syncSem) != 0) {
×
47
        vError("vgId:%d, failed to post sem", pVnode->config.vgId);
×
48
      }
49
    }
UNCOV
50
    (void)taosThreadMutexUnlock(&pVnode->lock);
×
51
  }
52
}
20✔
53

UNCOV
54
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
×
UNCOV
55
  SEpSet newEpSet = {0};
×
UNCOV
56
  syncGetRetryEpSet(pVnode->sync, &newEpSet);
×
57

UNCOV
58
  const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
59
  vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
×
60
          newEpSet.numOfEps, newEpSet.inUse);
UNCOV
61
  for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
×
UNCOV
62
    vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
×
63
            newEpSet.eps[i].port);
64
  }
UNCOV
65
  pMsg->info.hasEpSet = 1;
×
66

UNCOV
67
  if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
×
68

UNCOV
69
  SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
×
UNCOV
70
  int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
×
71

UNCOV
72
  rsp.pCont = rpcMallocCont(contLen);
×
UNCOV
73
  if (rsp.pCont == NULL) {
×
74
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
75
  } else {
UNCOV
76
    if (tSerializeSEpSet(rsp.pCont, contLen, &newEpSet) < 0) {
×
77
      vError("vgId:%d, failed to serialize ep set", pVnode->config.vgId);
×
78
    }
UNCOV
79
    rsp.contLen = contLen;
×
80
  }
81

UNCOV
82
  tmsgSendRsp(&rsp);
×
UNCOV
83
}
×
84

85
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
56✔
86
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
56✔
87
  if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
56!
UNCOV
88
    rsp.code = terrno;
×
89
    const STraceId *trace = &pMsg->info.traceId;
×
90
    vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
×
91
  }
92
  if (rsp.info.handle != NULL) {
56✔
93
    tmsgSendRsp(&rsp);
54✔
94
  } else {
95
    if (rsp.pCont) {
2!
UNCOV
96
      rpcFreeCont(rsp.pCont);
×
97
    }
98
  }
99
}
56✔
100

UNCOV
101
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
×
UNCOV
102
  if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
×
UNCOV
103
    vnodeRedirectRpcMsg(pVnode, pMsg, code);
×
UNCOV
104
  } else if (code == TSDB_CODE_MSG_PREPROCESSED) {
×
UNCOV
105
    SRpcMsg rsp = {.code = TSDB_CODE_SUCCESS, .info = pMsg->info};
×
UNCOV
106
    if (rsp.info.handle != NULL) {
×
UNCOV
107
      tmsgSendRsp(&rsp);
×
108
    }
109
  } else {
UNCOV
110
    const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
111
    vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
×
UNCOV
112
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
×
UNCOV
113
    if (rsp.info.handle != NULL) {
×
UNCOV
114
      tmsgSendRsp(&rsp);
×
115
    }
116
  }
UNCOV
117
}
×
118

119
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
56✔
120
  int64_t seq = 0;
56✔
121

122
  (void)taosThreadMutexLock(&pVnode->lock);
56✔
123
  int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
56✔
124
  bool    wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
56!
125
  if (wait) {
56!
UNCOV
126
    if (pVnode->blocked) {
×
127
      return TSDB_CODE_INTERNAL_ERROR;
×
128
    }
UNCOV
129
    pVnode->blocked = true;
×
UNCOV
130
    pVnode->blockSec = taosGetTimestampSec();
×
UNCOV
131
    pVnode->blockSeq = seq;
×
132
  }
133
  (void)taosThreadMutexUnlock(&pVnode->lock);
56✔
134

135
  if (code > 0) {
56!
136
    vnodeHandleWriteMsg(pVnode, pMsg);
56✔
UNCOV
137
  } else if (code < 0) {
×
UNCOV
138
    if (terrno != 0) code = terrno;
×
UNCOV
139
    vnodeHandleProposeError(pVnode, pMsg, code);
×
140
  }
141

142
  if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
56!
143
  return code;
56✔
144
}
145

146
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
76✔
147
  if (!vnodeShouldCommit(pVnode, atExit)) {
76✔
148
    return;
58✔
149
  }
150

151
  int32_t   contLen = sizeof(SMsgHead);
18✔
152
  SMsgHead *pHead = rpcMallocCont(contLen);
18✔
153
  pHead->contLen = contLen;
18✔
154
  pHead->vgId = pVnode->config.vgId;
18✔
155

156
  SRpcMsg rpcMsg = {0};
18✔
157
  rpcMsg.msgType = TDMT_VND_COMMIT;
18✔
158
  rpcMsg.contLen = contLen;
18✔
159
  rpcMsg.pCont = pHead;
18✔
160
  rpcMsg.info.noResp = 1;
18✔
161

162
  vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
18!
163
  bool isWeak = false;
18✔
164

165
  if (!atExit) {
18!
UNCOV
166
    if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
×
167
      vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
×
168
    }
UNCOV
169
    rpcFreeCont(rpcMsg.pCont);
×
UNCOV
170
    rpcMsg.pCont = NULL;
×
171
  } else {
172
    int32_t code = 0;
18✔
173
    if ((code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg)) < 0) {
18✔
174
      vError("vgId:%d, failed to put vnode commit to write_queue since %s", pVnode->config.vgId, tstrerror(code));
16!
175
    }
176
  }
177
}
178

179
#if BATCH_ENABLE
180

181
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
182
  if (*arrSize <= 0) return;
183
  SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
184

185
  (void)taosThreadMutexLock(&pVnode->lock);
186
  int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
187
  bool    wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
188
  if (wait) {
189
    pVnode->blocked = true;
190
  }
191
  (void)taosThreadMutexUnlock(&pVnode->lock);
192

193
  if (code > 0) {
194
    for (int32_t i = 0; i < *arrSize; ++i) {
195
      vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
196
    }
197
  } else if (code < 0) {
198
    if (terrno != 0) code = terrno;
199
    for (int32_t i = 0; i < *arrSize; ++i) {
200
      vnodeHandleProposeError(pVnode, pMsgArr[i], code);
201
    }
202
  }
203

204
  if (wait) vnodeWaitBlockMsg(pVnode, pLastMsg);
205
  pLastMsg = NULL;
206

207
  for (int32_t i = 0; i < *arrSize; ++i) {
208
    SRpcMsg        *pMsg = pMsgArr[i];
209
    const STraceId *trace = &pMsg->info.traceId;
210
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
211
    rpcFreeCont(pMsg->pCont);
212
    taosFreeQitem(pMsg);
213
  }
214

215
  *arrSize = 0;
216
}
217

218
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
219
  SVnode   *pVnode = pInfo->ahandle;
220
  int32_t   vgId = pVnode->config.vgId;
221
  int32_t   code = 0;
222
  SRpcMsg  *pMsg = NULL;
223
  int32_t   arrayPos = 0;
224
  SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
225
  bool     *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
226
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
227

228
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
229
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
230
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
231
    bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
232

233
    const STraceId *trace = &pMsg->info.traceId;
234
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
235
            isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
236

237
    if (!pVnode->restored) {
238
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
239
             TMSG_INFO(pMsg->msgType));
240
      terrno = TSDB_CODE_SYN_RESTORING;
241
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
242
      rpcFreeCont(pMsg->pCont);
243
      taosFreeQitem(pMsg);
244
      continue;
245
    }
246

247
    if (pMsgArr == NULL || pIsWeakArr == NULL) {
248
      vGError("vgId:%d, msg:%p failed to process since out of memory, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType));
249
      terrno = TSDB_CODE_OUT_OF_MEMORY;
250
      vnodeHandleProposeError(pVnode, pMsg, terrno);
251
      rpcFreeCont(pMsg->pCont);
252
      taosFreeQitem(pMsg);
253
      continue;
254
    }
255

256
    bool atExit = false;
257
    vnodeProposeCommitOnNeed(pVnode, atExit);
258

259
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
260
    if (code != 0) {
261
      vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
262
      rpcFreeCont(pMsg->pCont);
263
      taosFreeQitem(pMsg);
264
      continue;
265
    }
266

267
    if (isBlock) {
268
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
269
    }
270

271
    pMsgArr[arrayPos] = pMsg;
272
    pIsWeakArr[arrayPos] = isWeak;
273
    arrayPos++;
274

275
    if (isBlock || msg == numOfMsgs - 1) {
276
      vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
277
    }
278
  }
279

280
  taosMemoryFree(pMsgArr);
281
  taosMemoryFree(pIsWeakArr);
282
}
283

284
#else
285

286
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
56✔
287
  SVnode  *pVnode = pInfo->ahandle;
56✔
288
  int32_t  vgId = pVnode->config.vgId;
56✔
289
  int32_t  code = 0;
56✔
290
  SRpcMsg *pMsg = NULL;
56✔
291
  vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
56!
292

293
  for (int32_t msg = 0; msg < numOfMsgs; msg++) {
112✔
294
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
56!
295
    bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
56✔
296

297
    const STraceId *trace = &pMsg->info.traceId;
56✔
298
    vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
56!
299
            vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
300

301
    if (!pVnode->restored) {
56!
UNCOV
302
      vGWarn("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg,
×
303
             TMSG_INFO(pMsg->msgType));
UNCOV
304
      vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
×
UNCOV
305
      rpcFreeCont(pMsg->pCont);
×
UNCOV
306
      taosFreeQitem(pMsg);
×
UNCOV
307
      continue;
×
308
    }
309

310
    bool atExit = false;
56✔
311
    vnodeProposeCommitOnNeed(pVnode, atExit);
56✔
312

313
    code = vnodePreProcessWriteMsg(pVnode, pMsg);
56✔
314
    if (code != 0) {
56!
UNCOV
315
      if (code != TSDB_CODE_MSG_PREPROCESSED) {
×
UNCOV
316
        vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code));
×
317
      }
UNCOV
318
      vnodeHandleProposeError(pVnode, pMsg, code);
×
UNCOV
319
      rpcFreeCont(pMsg->pCont);
×
UNCOV
320
      taosFreeQitem(pMsg);
×
UNCOV
321
      continue;
×
322
    }
323

324
    code = vnodeProposeMsg(pVnode, pMsg, isWeak);
56✔
325

326
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
56!
327
    rpcFreeCont(pMsg->pCont);
56✔
328
    taosFreeQitem(pMsg);
56✔
329
  }
330
}
56✔
331

332
#endif
333

334
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
20✔
335
  SVnode  *pVnode = pInfo->ahandle;
20✔
336
  int32_t  vgId = pVnode->config.vgId;
20✔
337
  int32_t  code = 0;
20✔
338
  SRpcMsg *pMsg = NULL;
20✔
339

340
  for (int32_t i = 0; i < numOfMsgs; ++i) {
40✔
341
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
20!
342
    const STraceId *trace = &pMsg->info.traceId;
20✔
343

344
    if (vnodeIsMsgBlock(pMsg->msgType)) {
20!
UNCOV
345
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64
×
346
              ", blocking msg obtained sec:%d seq:%" PRId64,
347
              vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec,
348
              pVnode->blockSeq);
349
    } else {
350
      vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
20!
351
              TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
352
    }
353

354
    SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
20✔
355
    if (rsp.code == 0) {
20!
356
      if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
20!
357
        rsp.code = terrno;
×
358
        vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
×
359
                pMsg->info.conn.applyIndex);
360
      }
361
    }
362

363
    vnodePostBlockMsg(pVnode, pMsg);
20✔
364
    if (rsp.info.handle != NULL) {
20!
UNCOV
365
      tmsgSendRsp(&rsp);
×
366
    } else {
367
      if (rsp.pCont) {
20!
UNCOV
368
        rpcFreeCont(rsp.pCont);
×
369
      }
370
    }
371

372
    vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
20!
373
    rpcFreeCont(pMsg->pCont);
20✔
374
    taosFreeQitem(pMsg);
20✔
375
  }
376
}
20✔
377

UNCOV
378
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
×
UNCOV
379
  const STraceId *trace = &pMsg->info.traceId;
×
UNCOV
380
  vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
×
381

UNCOV
382
  int32_t code = syncProcessMsg(pVnode->sync, pMsg);
×
UNCOV
383
  if (code != 0) {
×
UNCOV
384
    vGError("vgId:%d, failed to process sync msg:%p type:%s, reason: %s", pVnode->config.vgId, pMsg,
×
385
            TMSG_INFO(pMsg->msgType), tstrerror(code));
386
  }
387

UNCOV
388
  return code;
×
389
}
390

391
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
392
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
393
    return TSDB_CODE_INVALID_PARA;
×
394
  }
395

396
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
397
    rpcFreeCont(pMsg->pCont);
×
398
    pMsg->pCont = NULL;
×
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401

402
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
×
403
  if (code != 0) {
×
404
    rpcFreeCont(pMsg->pCont);
×
405
    pMsg->pCont = NULL;
×
406
  }
407
  return code;
×
408
}
409

UNCOV
410
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
×
UNCOV
411
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414

UNCOV
415
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
×
416
    rpcFreeCont(pMsg->pCont);
×
417
    pMsg->pCont = NULL;
×
418
    return TSDB_CODE_INVALID_PARA;
×
419
  }
420

UNCOV
421
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
×
UNCOV
422
  if (code != 0) {
×
UNCOV
423
    rpcFreeCont(pMsg->pCont);
×
UNCOV
424
    pMsg->pCont = NULL;
×
425
  }
UNCOV
426
  return code;
×
427
}
428

UNCOV
429
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
×
UNCOV
430
  int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
×
UNCOV
431
  if (code != 0) {
×
432
    rpcFreeCont(pMsg->pCont);
×
433
    pMsg->pCont = NULL;
×
434
  }
UNCOV
435
  return code;
×
436
}
437

438
static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
400✔
439
  return vnodeGetSnapshot(pFsm->data, pSnapshot);
400✔
440
}
441

442
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
20✔
443
  SVnode *pVnode = pFsm->data;
20✔
444
  pMsg->info.conn.applyIndex = pMeta->index;
20✔
445
  pMsg->info.conn.applyTerm = pMeta->term;
20✔
446

447
  const STraceId *trace = &pMsg->info.traceId;
20✔
448
  vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
20!
449
          ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
450
          pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
451
          pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
452

453
  int32_t code = tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
20✔
454
  if (code < 0) vError("vgId:%d, failed to put into apply_queue since %s", pVnode->config.vgId, tstrerror(code));
20!
455
  return code;
20✔
456
}
457

458
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
20✔
459
  if (pMsg->code == 0) {
20!
460
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
20✔
461
  }
462

463
  const STraceId *trace = &pMsg->info.traceId;
×
464
  SVnode         *pVnode = pFsm->data;
×
465
  vnodePostBlockMsg(pVnode, pMsg);
×
466

467
  SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
×
468
  if (rsp.info.handle != NULL) {
×
469
    tmsgSendRsp(&rsp);
×
470
  }
471

472
  vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
×
473
  rpcFreeCont(pMsg->pCont);
×
474
  pMsg->pCont = NULL;
×
475
  return 0;
×
476
}
477

478
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
479
  if (pMeta->isWeak == 1) {
×
480
    return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
×
481
  }
482
  return 0;
×
483
}
484

485
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
105✔
486
  SVnode *pVnode = pFSM->data;
105✔
487
  return atomic_load_64(&pVnode->state.applied);
105✔
488
}
489

490
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
×
491
  SVnode *pVnode = pFsm->data;
×
492
  vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
×
493
         pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
494
         TMSG_INFO(pMsg->msgType));
495
}
×
496

UNCOV
497
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
×
UNCOV
498
  SVnode *pVnode = pFsm->data;
×
UNCOV
499
  return vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
×
500
}
501

UNCOV
502
static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
×
UNCOV
503
  SVnode *pVnode = pFsm->data;
×
UNCOV
504
  vnodeSnapReaderClose(pReader);
×
UNCOV
505
}
×
506

UNCOV
507
static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
×
UNCOV
508
  SVnode *pVnode = pFsm->data;
×
UNCOV
509
  return vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
×
510
}
511

UNCOV
512
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
×
UNCOV
513
  SVnode *pVnode = pFsm->data;
×
514

515
  do {
×
UNCOV
516
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
UNCOV
517
    if (itemSize == 0) {
×
UNCOV
518
      vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
×
UNCOV
519
      break;
×
520
    } else {
521
      vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId, itemSize);
×
522
      taosMsleep(10);
×
523
    }
524
  } while (true);
525

UNCOV
526
  return vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
×
527
}
528

UNCOV
529
static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
×
UNCOV
530
  SVnode *pVnode = pFsm->data;
×
UNCOV
531
  vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
×
532
        pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
533

UNCOV
534
  int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
×
UNCOV
535
  if (code != 0) {
×
536
    vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
×
537
           code);
538
  }
UNCOV
539
  return code;
×
540
}
541

UNCOV
542
static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
×
UNCOV
543
  SVnode *pVnode = pFsm->data;
×
UNCOV
544
  vDebug("vgId:%d, continue write vnode snapshot, blockLen:%d", pVnode->config.vgId, len);
×
UNCOV
545
  int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
×
UNCOV
546
  vDebug("vgId:%d, continue write vnode snapshot finished, blockLen:%d", pVnode->config.vgId, len);
×
UNCOV
547
  return code;
×
548
}
549

550
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
20✔
551
  SVnode   *pVnode = pFsm->data;
20✔
552
  int32_t   vgId = pVnode->config.vgId;
20✔
553
  SyncIndex appliedIdx = -1;
20✔
554

555
  do {
556
    appliedIdx = vnodeSyncAppliedIndex(pFsm);
29✔
557
    if (appliedIdx > commitIdx) {
29!
558
      vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId,
×
559
             appliedIdx, commitIdx);
560
      break;
×
561
    }
562
    if (appliedIdx == commitIdx) {
29✔
563
      vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
20!
564
      break;
20✔
565
    } else {
566
      vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
9!
567
            ", applied-index:%" PRId64,
568
            vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
569
      taosMsleep(10);
9✔
570
    }
571
  } while (true);
572

573
  walApplyVer(pVnode->pWal, commitIdx);
20✔
574
  pVnode->restored = true;
20✔
575

576
#ifdef USE_STREAM
577
  SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
20✔
578
  streamMetaWLock(pMeta);
20✔
579

580
  if (pMeta->startInfo.tasksWillRestart) {
20!
581
    vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
×
582
    streamMetaWUnLock(pMeta);
×
UNCOV
583
    return;
×
584
  }
585

586
  if (vnodeIsRoleLeader(pVnode)) {
20!
587
    // start to restore all stream tasks
588
    if (tsDisableStream) {
20!
UNCOV
589
      vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
×
590
    } else {
591
      vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId);
20!
592
      if (pMeta->startInfo.startAllTasks == 1) {
20!
593
        pMeta->startInfo.restartCount += 1;
×
UNCOV
594
        vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
×
595
               pMeta->startInfo.restartCount);
596
      } else {
597
        pMeta->startInfo.startAllTasks = 1;
20✔
598
        streamMetaWUnLock(pMeta);
20✔
599

600
        tqInfo("vgId:%d stream task already loaded, start them", vgId);
20!
601
        int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false);
20✔
602
        if (code != 0) {
20!
UNCOV
603
          tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
×
604
        }
605
        return;
20✔
606
      }
607
    }
608
  } else {
UNCOV
609
    vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
×
610
  }
611

UNCOV
612
  streamMetaWUnLock(pMeta);
×
613
#endif
614
}
615

UNCOV
616
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
×
UNCOV
617
  SVnode *pVnode = pFsm->data;
×
UNCOV
618
  vInfo("vgId:%d, become follower", pVnode->config.vgId);
×
619

620
  (void)taosThreadMutexLock(&pVnode->lock);
×
621
  if (pVnode->blocked) {
×
622
    pVnode->blocked = false;
×
623
    vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
×
UNCOV
624
    if (tsem_post(&pVnode->syncSem) != 0) {
×
UNCOV
625
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
626
    }
627
  }
UNCOV
628
  (void)taosThreadMutexUnlock(&pVnode->lock);
×
629

630
#ifdef USE_TQ
631
  if (pVnode->pTq) {
×
UNCOV
632
    tqUpdateNodeStage(pVnode->pTq, false);
×
UNCOV
633
    if (tqStopStreamAllTasksAsync(pVnode->pTq->pStreamMeta, &pVnode->msgCb) != 0) {
×
UNCOV
634
      vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId);
×
635
    }
636
  }
637
#endif
UNCOV
638
}
×
639

UNCOV
640
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
×
UNCOV
641
  SVnode *pVnode = pFsm->data;
×
642
  vInfo("vgId:%d, become learner", pVnode->config.vgId);
×
643

644
  (void)taosThreadMutexLock(&pVnode->lock);
×
645
  if (pVnode->blocked) {
×
UNCOV
646
    pVnode->blocked = false;
×
UNCOV
647
    vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
×
UNCOV
648
    if (tsem_post(&pVnode->syncSem) != 0) {
×
UNCOV
649
      vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
×
650
    }
651
  }
UNCOV
652
  (void)taosThreadMutexUnlock(&pVnode->lock);
×
UNCOV
653
}
×
654

655
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
20✔
656
  SVnode *pVnode = pFsm->data;
20✔
657
  vDebug("vgId:%d, become leader", pVnode->config.vgId);
20!
658
#ifdef USE_TQ
659
  if (pVnode->pTq) {
20!
660
    tqUpdateNodeStage(pVnode->pTq, true);
20✔
661
  }
662
#endif
663
}
20✔
664

665
static void vnodeBecomeAssignedLeader(const SSyncFSM *pFsm) {
×
UNCOV
666
  SVnode *pVnode = pFsm->data;
×
667
  vDebug("vgId:%d, become assigned leader", pVnode->config.vgId);
×
668
#ifdef USE_TQ
UNCOV
669
  if (pVnode->pTq) {
×
670
    tqUpdateNodeStage(pVnode->pTq, true);
×
671
  }
672
#endif
UNCOV
673
}
×
674

UNCOV
675
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
×
UNCOV
676
  SVnode *pVnode = pFsm->data;
×
677

UNCOV
678
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
UNCOV
679
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
UNCOV
680
    return (itemSize == 0);
×
681
  } else {
UNCOV
682
    return true;
×
683
  }
684
}
685

UNCOV
686
static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) {
×
UNCOV
687
  SVnode *pVnode = pFsm->data;
×
688

UNCOV
689
  if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) {
×
UNCOV
690
    int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
×
UNCOV
691
    return itemSize;
×
692
  } else {
693
    return TSDB_CODE_INVALID_PARA;
×
694
  }
695
}
696

697
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
20✔
698
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
20!
699
  if (pFsm == NULL) {
20!
UNCOV
700
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
701
    return NULL;
×
702
  }
703
  pFsm->data = pVnode;
20✔
704
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
20✔
705
  pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
20✔
706
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
20✔
707
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
20✔
708
  pFsm->FpGetSnapshot = NULL;
20✔
709
  pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
20✔
710
  pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
20✔
711
  pFsm->FpAfterRestoredCb = NULL;
20✔
712
  pFsm->FpLeaderTransferCb = NULL;
20✔
713
  pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
20✔
714
  pFsm->FpApplyQueueItems = vnodeApplyQueueItems;
20✔
715
  pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
20✔
716
  pFsm->FpBecomeAssignedLeaderCb = vnodeBecomeAssignedLeader;
20✔
717
  pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
20✔
718
  pFsm->FpBecomeLearnerCb = vnodeBecomeLearner;
20✔
719
  pFsm->FpReConfigCb = NULL;
20✔
720
  pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
20✔
721
  pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
20✔
722
  pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
20✔
723
  pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
20✔
724
  pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
20✔
725
  pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
20✔
726

727
  return pFsm;
20✔
728
}
729

730
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
20✔
731
  SSyncInfo syncInfo = {
20✔
732
      .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
733
      .batchSize = 1,
734
      .vgId = pVnode->config.vgId,
20✔
735
      .syncCfg = pVnode->config.syncCfg,
736
      .pWal = pVnode->pWal,
20✔
737
      .msgcb = &pVnode->msgCb,
20✔
738
      .syncSendMSg = vnodeSyncSendMsg,
739
      .syncEqMsg = vnodeSyncEqMsg,
740
      .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
741
      .pingMs = 5000,
742
      .electMs = 4000,
743
      .heartbeatMs = 700,
744
  };
745

746
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
20✔
747
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
20✔
748

749
  SSyncCfg *pCfg = &syncInfo.syncCfg;
20✔
750
  vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
20!
751
  for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
40✔
752
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
20✔
753
    vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn,
20!
754
          pNode->nodePort, pNode->nodeId, pNode->clusterId);
755
  }
756

757
  pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
20✔
758
  if (pVnode->sync <= 0) {
20!
UNCOV
759
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
×
UNCOV
760
    return terrno;
×
761
  }
762

763
  return 0;
20✔
764
}
765

766
int32_t vnodeSyncStart(SVnode *pVnode) {
20✔
767
  vInfo("vgId:%d, start sync", pVnode->config.vgId);
20!
768
  int32_t code = syncStart(pVnode->sync);
20✔
769
  if (code) {
20!
UNCOV
770
    vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, tstrerror(code));
×
UNCOV
771
    return code;
×
772
  }
773
  return 0;
20✔
774
}
775

776
void vnodeSyncPreClose(SVnode *pVnode) {
20✔
777
  vInfo("vgId:%d, sync pre close", pVnode->config.vgId);
20!
778
  int32_t code = syncLeaderTransfer(pVnode->sync);
20✔
779
  if (code) {
20!
UNCOV
780
    vError("vgId:%d, failed to transfer leader since %s", pVnode->config.vgId, tstrerror(code));
×
781
  }
782
  syncPreStop(pVnode->sync);
20✔
783

784
  (void)taosThreadMutexLock(&pVnode->lock);
20✔
785
  if (pVnode->blocked) {
20!
UNCOV
786
    vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
×
UNCOV
787
    pVnode->blocked = false;
×
UNCOV
788
    if (tsem_post(&pVnode->syncSem) != 0) {
×
UNCOV
789
      vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
790
    }
791
  }
792
  (void)taosThreadMutexUnlock(&pVnode->lock);
20✔
793
}
20✔
794

795
void vnodeSyncPostClose(SVnode *pVnode) {
20✔
796
  vInfo("vgId:%d, sync post close", pVnode->config.vgId);
20!
797
  syncPostStop(pVnode->sync);
20✔
798
}
20✔
799

800
void vnodeSyncClose(SVnode *pVnode) {
20✔
801
  vInfo("vgId:%d, close sync", pVnode->config.vgId);
20!
802
  syncStop(pVnode->sync);
20✔
803
}
20✔
804

UNCOV
805
void vnodeSyncCheckTimeout(SVnode *pVnode) {
×
806
  vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
×
UNCOV
807
  (void)taosThreadMutexLock(&pVnode->lock);
×
UNCOV
808
  if (pVnode->blocked) {
×
UNCOV
809
    int32_t curSec = taosGetTimestampSec();
×
UNCOV
810
    int32_t delta = curSec - pVnode->blockSec;
×
UNCOV
811
    if (delta > VNODE_TIMEOUT_SEC) {
×
UNCOV
812
      vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
×
813
             pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
814
      if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
×
815
#if 0
816
        SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
817
        vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
818
              rpcMsg.info.handle, rpcMsg.info.ahandle);
819
        rpcSendResponse(&rpcMsg);
820
#endif
821
      }
UNCOV
822
      pVnode->blocked = false;
×
UNCOV
823
      pVnode->blockSec = 0;
×
UNCOV
824
      pVnode->blockSeq = 0;
×
UNCOV
825
      if (tsem_post(&pVnode->syncSem) != 0) {
×
UNCOV
826
        vError("vgId:%d, failed to post block", pVnode->config.vgId);
×
827
      }
828
    }
829
  }
UNCOV
830
  (void)taosThreadMutexUnlock(&pVnode->lock);
×
UNCOV
831
}
×
832

833
bool vnodeIsRoleLeader(SVnode *pVnode) {
40✔
834
  SSyncState state = syncGetState(pVnode->sync);
40✔
835
  return state.state == TAOS_SYNC_STATE_LEADER;
40✔
836
}
837

838
bool vnodeIsLeader(SVnode *pVnode) {
36✔
839
  terrno = 0;
36✔
840
  SSyncState state = syncGetState(pVnode->sync);
36✔
841

842
  if (terrno != 0) {
36!
UNCOV
843
    vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
×
UNCOV
844
    return false;
×
845
  }
846

847
  if (state.state != TAOS_SYNC_STATE_LEADER) {
36!
848
    terrno = TSDB_CODE_SYN_NOT_LEADER;
×
UNCOV
849
    vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
×
UNCOV
850
    return false;
×
851
  }
852

853
  if (!state.restored || !pVnode->restored) {
36!
854
    terrno = TSDB_CODE_SYN_RESTORING;
×
855
    vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
×
856
    return false;
×
857
  }
858

859
  return true;
36✔
860
}
861

UNCOV
862
int64_t vnodeClusterId(SVnode *pVnode) {
×
UNCOV
863
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
×
UNCOV
864
  return syncCfg->nodeInfo[syncCfg->myIndex].clusterId;
×
865
}
866

867
int32_t vnodeNodeId(SVnode *pVnode) {
20✔
868
  SSyncCfg *syncCfg = &pVnode->config.syncCfg;
20✔
869
  return syncCfg->nodeInfo[syncCfg->myIndex].nodeId;
20✔
870
}
871

872
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
400✔
873
  int code = 0;
400✔
874
  pSnap->lastApplyIndex = pVnode->state.committed;
400✔
875
  pSnap->lastApplyTerm = pVnode->state.commitTerm;
400✔
876
  pSnap->lastConfigIndex = -1;
400✔
877
  pSnap->state = SYNC_FSM_STATE_COMPLETE;
400✔
878

879
  if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
400!
UNCOV
880
    pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
×
881
  }
882

883
  if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
400!
UNCOV
884
    code = tsdbSnapPrepDescription(pVnode, pSnap);
×
885
  }
886
  return code;
400✔
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc